You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/28 06:10:15 UTC

[iotdb] branch master updated: [IOTDB-2967] New writing process of cluster (#5656)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a9fd419fbd [IOTDB-2967] New writing process of cluster (#5656)
a9fd419fbd is described below

commit a9fd419fbd339591791dde185f856eb117169afc
Author: Mrquan <50...@users.noreply.github.com>
AuthorDate: Thu Apr 28 14:10:10 2022 +0800

    [IOTDB-2967] New writing process of cluster (#5656)
---
 .../apache/iotdb/db/mpp/execution/Coordinator.java |   2 +-
 .../apache/iotdb/db/mpp/sql/analyze/Analyzer.java  | 130 +-----
 .../iotdb/db/mpp/sql/analyze/SchemaValidator.java  |  57 +++
 .../iotdb/db/mpp/sql/planner/LogicalPlanner.java   | 154 +++----
 .../plan/node/write/InsertMultiTabletsNode.java    |  89 +++-
 .../sql/planner/plan/node/write/InsertNode.java    |  77 ++--
 .../sql/planner/plan/node/write/InsertRowNode.java | 365 ++++++++++-----
 .../planner/plan/node/write/InsertRowsNode.java    |  89 +++-
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |  75 ++-
 .../planner/plan/node/write/InsertTabletNode.java  | 505 +++++++++++----------
 .../db/mpp/sql/statement/crud/BatchInsert.java     |  37 ++
 .../sql/statement/crud/InsertBaseStatement.java    |  31 --
 .../crud/InsertMultiTabletsStatement.java          |  11 -
 .../mpp/sql/statement/crud/InsertRowStatement.java |  94 ----
 .../crud/InsertRowsOfOneDeviceStatement.java       |  18 -
 .../sql/statement/crud/InsertRowsStatement.java    |  18 -
 .../sql/statement/crud/InsertTabletStatement.java  |  51 +--
 .../service/thrift/impl/InternalServiceImpl.java   |  25 +-
 .../db/engine/storagegroup/DataRegionTest.java     |  44 +-
 .../engine/storagegroup/TsFileProcessorV2Test.java |  25 +-
 .../db/mpp/sql/plan/DistributionPlannerTest.java   |  33 +-
 .../write/InsertMultiTabletsNodeSerdeTest.java     | 101 +++++
 .../plan/node/write/InsertRowNodeSerdeTest.java    | 134 ++++--
 .../plan/node/write/InsertRowsNodeSerdeTest.java   |  72 +++
 .../write/InsertRowsOfOneDeviceNodeSerdeTest.java  |  74 +++
 .../plan/node/write/InsertTabletNodeSerdeTest.java |  96 +++-
 26 files changed, 1524 insertions(+), 883 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 7e0c78b268..9f02665c0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -49,7 +49,7 @@ public class Coordinator {
   private static final Logger LOGGER = LoggerFactory.getLogger(Coordinator.class);
 
   private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator";
-  private static final int COORDINATOR_EXECUTOR_SIZE = 1;
+  private static final int COORDINATOR_EXECUTOR_SIZE = 10;
   private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME = "MPPCoordinatorScheduled";
   private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 1;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
index a0c55fdcf0..fb2cfdf6db 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
 import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.db.exception.query.PathNumOverLimitException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
@@ -299,68 +298,36 @@ public class Analyzer {
     public Analysis visitInsertTablet(
         InsertTabletStatement insertTabletStatement, MPPQueryContext context) {
       context.setQueryType(QueryType.WRITE);
-      SchemaTree schemaTree =
-          schemaFetcher.fetchSchemaWithAutoCreate(
-              insertTabletStatement.getDevicePath(),
-              insertTabletStatement.getMeasurements(),
-              insertTabletStatement.getDataTypes(),
-              insertTabletStatement.isAligned());
-
-      if (!insertTabletStatement.checkDataType(schemaTree)) {
-        throw new SemanticException("Data type mismatch");
-      }
 
-      Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
       DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
       dataPartitionQueryParam.setDevicePath(insertTabletStatement.getDevicePath().getFullPath());
       dataPartitionQueryParam.setTimePartitionSlotList(
           insertTabletStatement.getTimePartitionSlots());
-      sgNameToQueryParamsMap.put(
-          schemaTree.getBelongedStorageGroup(insertTabletStatement.getDevicePath()),
-          Collections.singletonList(dataPartitionQueryParam));
-      DataPartition dataPartition;
-      dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+
+      DataPartition dataPartition =
+          partitionFetcher.getOrCreateDataPartition(
+              Collections.singletonList(dataPartitionQueryParam));
 
       Analysis analysis = new Analysis();
-      analysis.setSchemaTree(schemaTree);
       analysis.setStatement(insertTabletStatement);
       analysis.setDataPartitionInfo(dataPartition);
+
       return analysis;
     }
 
     @Override
     public Analysis visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryContext context) {
       context.setQueryType(QueryType.WRITE);
-      // TODO remove duplicate
-      SchemaTree schemaTree =
-          schemaFetcher.fetchSchemaWithAutoCreate(
-              insertRowStatement.getDevicePath(),
-              insertRowStatement.getMeasurements(),
-              insertRowStatement.getDataTypes(),
-              insertRowStatement.isAligned());
-
-      try {
-        insertRowStatement.transferType(schemaTree);
-      } catch (QueryProcessException e) {
-        throw new SemanticException(e.getMessage());
-      }
 
-      if (!insertRowStatement.checkDataType(schemaTree)) {
-        throw new SemanticException("Data type mismatch");
-      }
-
-      Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
       DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
       dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath());
       dataPartitionQueryParam.setTimePartitionSlotList(insertRowStatement.getTimePartitionSlots());
-      sgNameToQueryParamsMap.put(
-          schemaTree.getBelongedStorageGroup(insertRowStatement.getDevicePath()),
-          Collections.singletonList(dataPartitionQueryParam));
+
       DataPartition dataPartition =
-          partitionFetcher.getOrCreateDataPartition(sgNameToQueryParamsMap);
+          partitionFetcher.getOrCreateDataPartition(
+              Collections.singletonList(dataPartitionQueryParam));
 
       Analysis analysis = new Analysis();
-      analysis.setSchemaTree(schemaTree);
       analysis.setStatement(insertRowStatement);
       analysis.setDataPartitionInfo(dataPartition);
 
@@ -371,41 +338,21 @@ public class Analyzer {
     public Analysis visitInsertRows(
         InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
       context.setQueryType(QueryType.WRITE);
-      // TODO remove duplicate
-      SchemaTree schemaTree =
-          schemaFetcher.fetchSchemaListWithAutoCreate(
-              insertRowsStatement.getDevicePaths(),
-              insertRowsStatement.getMeasurementsList(),
-              insertRowsStatement.getDataTypesList(),
-              insertRowsStatement.getAlignedList());
 
-      try {
-        insertRowsStatement.transferType(schemaTree);
-      } catch (QueryProcessException e) {
-        throw new SemanticException(e.getMessage());
-      }
-
-      if (!insertRowsStatement.checkDataType(schemaTree)) {
-        throw new SemanticException("Data type mismatch");
-      }
-
-      Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
+      List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
       for (InsertRowStatement insertRowStatement :
           insertRowsStatement.getInsertRowStatementList()) {
         DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
         dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath());
         dataPartitionQueryParam.setTimePartitionSlotList(
             insertRowStatement.getTimePartitionSlots());
-        sgNameToQueryParamsMap
-            .computeIfAbsent(
-                schemaTree.getBelongedStorageGroup(insertRowStatement.getDevicePath()),
-                key -> new ArrayList<>())
-            .add(dataPartitionQueryParam);
+        dataPartitionQueryParams.add(dataPartitionQueryParam);
       }
-      DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+
+      DataPartition dataPartition =
+          partitionFetcher.getOrCreateDataPartition(dataPartitionQueryParams);
 
       Analysis analysis = new Analysis();
-      analysis.setSchemaTree(schemaTree);
       analysis.setStatement(insertRowsStatement);
       analysis.setDataPartitionInfo(dataPartition);
 
@@ -416,35 +363,21 @@ public class Analyzer {
     public Analysis visitInsertMultiTablets(
         InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext context) {
       context.setQueryType(QueryType.WRITE);
-      // TODO remove duplicate
-      SchemaTree schemaTree =
-          schemaFetcher.fetchSchemaListWithAutoCreate(
-              insertMultiTabletsStatement.getDevicePaths(),
-              insertMultiTabletsStatement.getMeasurementsList(),
-              insertMultiTabletsStatement.getDataTypesList(),
-              insertMultiTabletsStatement.getAlignedList());
-
-      if (!insertMultiTabletsStatement.checkDataType(schemaTree)) {
-        throw new SemanticException("Data type mismatch");
-      }
 
-      Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
+      List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
       for (InsertTabletStatement insertTabletStatement :
           insertMultiTabletsStatement.getInsertTabletStatementList()) {
         DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
         dataPartitionQueryParam.setDevicePath(insertTabletStatement.getDevicePath().getFullPath());
         dataPartitionQueryParam.setTimePartitionSlotList(
             insertTabletStatement.getTimePartitionSlots());
-        sgNameToQueryParamsMap
-            .computeIfAbsent(
-                schemaTree.getBelongedStorageGroup(insertTabletStatement.getDevicePath()),
-                key -> new ArrayList<>())
-            .add(dataPartitionQueryParam);
+        dataPartitionQueryParams.add(dataPartitionQueryParam);
       }
-      DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+
+      DataPartition dataPartition =
+          partitionFetcher.getOrCreateDataPartition(dataPartitionQueryParams);
 
       Analysis analysis = new Analysis();
-      analysis.setSchemaTree(schemaTree);
       analysis.setStatement(insertMultiTabletsStatement);
       analysis.setDataPartitionInfo(dataPartition);
 
@@ -455,37 +388,18 @@ public class Analyzer {
     public Analysis visitInsertRowsOfOneDevice(
         InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, MPPQueryContext context) {
       context.setQueryType(QueryType.WRITE);
-      // TODO remove duplicate
-      SchemaTree schemaTree =
-          schemaFetcher.fetchSchemaWithAutoCreate(
-              insertRowsOfOneDeviceStatement.getDevicePath(),
-              insertRowsOfOneDeviceStatement.getMeasurements(),
-              insertRowsOfOneDeviceStatement.getDataTypes(),
-              insertRowsOfOneDeviceStatement.isAligned());
 
-      try {
-        insertRowsOfOneDeviceStatement.transferType(schemaTree);
-      } catch (QueryProcessException e) {
-        throw new SemanticException(e.getMessage());
-      }
-
-      if (!insertRowsOfOneDeviceStatement.checkDataType(schemaTree)) {
-        throw new SemanticException("Data type mismatch");
-      }
-
-      Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
       DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
       dataPartitionQueryParam.setDevicePath(
           insertRowsOfOneDeviceStatement.getDevicePath().getFullPath());
       dataPartitionQueryParam.setTimePartitionSlotList(
           insertRowsOfOneDeviceStatement.getTimePartitionSlots());
-      sgNameToQueryParamsMap.put(
-          schemaTree.getBelongedStorageGroup(insertRowsOfOneDeviceStatement.getDevicePath()),
-          Collections.singletonList(dataPartitionQueryParam));
-      DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+
+      DataPartition dataPartition =
+          partitionFetcher.getOrCreateDataPartition(
+              Collections.singletonList(dataPartitionQueryParam));
 
       Analysis analysis = new Analysis();
-      analysis.setSchemaTree(schemaTree);
       analysis.setStatement(insertRowsOfOneDeviceStatement);
       analysis.setDataPartitionInfo(dataPartition);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/SchemaValidator.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/SchemaValidator.java
new file mode 100644
index 0000000000..153da87d8e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/SchemaValidator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.analyze;
+
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.mpp.sql.statement.crud.BatchInsert;
+
+public class SchemaValidator {
+
+  private static final ISchemaFetcher schemaFetcher = ClusterSchemaFetcher.getInstance();
+
+  public static SchemaTree validate(InsertNode insertNode) {
+
+    SchemaTree schemaTree;
+    if (insertNode instanceof BatchInsert) {
+      BatchInsert batchInsert = (BatchInsert) insertNode;
+      schemaTree =
+          schemaFetcher.fetchSchemaListWithAutoCreate(
+              batchInsert.getDevicePaths(),
+              batchInsert.getMeasurementsList(),
+              batchInsert.getDataTypesList(),
+              batchInsert.getAlignedList());
+    } else {
+      schemaTree =
+          schemaFetcher.fetchSchemaWithAutoCreate(
+              insertNode.getDevicePath(),
+              insertNode.getMeasurements(),
+              insertNode.getDataTypes(),
+              insertNode.isAligned());
+    }
+
+    if (!insertNode.validateSchema(schemaTree)) {
+      throw new SemanticException("Data type mismatch");
+    }
+
+    return schemaTree;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
index 230bfc8d3f..6e752421b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.mpp.sql.planner;
 
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
-import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.optimization.PlanOptimizer;
 import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
@@ -31,6 +30,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSe
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertMultiTabletsNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowsNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
 import org.apache.iotdb.db.mpp.sql.statement.crud.AggregationQueryStatement;
@@ -57,10 +57,7 @@ import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowDevicesStatement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowTimeSeriesStatement;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.tsfile.read.expression.ExpressionType;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -231,20 +228,12 @@ public class LogicalPlanner {
     @Override
     public PlanNode visitInsertTablet(
         InsertTabletStatement insertTabletStatement, MPPQueryContext context) {
-      // set schema in insert node
       // convert insert statement to insert node
-      DeviceSchemaInfo deviceSchemaInfo =
-          analysis
-              .getSchemaTree()
-              .searchDeviceSchemaInfo(
-                  insertTabletStatement.getDevicePath(),
-                  Arrays.asList(insertTabletStatement.getMeasurements()));
-      List<MeasurementSchema> measurementSchemas = deviceSchemaInfo.getMeasurementSchemaList();
       return new InsertTabletNode(
           context.getQueryId().genPlanNodeId(),
           insertTabletStatement.getDevicePath(),
           insertTabletStatement.isAligned(),
-          measurementSchemas.toArray(new MeasurementSchema[0]),
+          insertTabletStatement.getMeasurements(),
           insertTabletStatement.getDataTypes(),
           insertTabletStatement.getTimes(),
           insertTabletStatement.getBitMaps(),
@@ -254,60 +243,16 @@ public class LogicalPlanner {
 
     @Override
     public PlanNode visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryContext context) {
-      // set schema in insert node
       // convert insert statement to insert node
-      DeviceSchemaInfo deviceSchemaInfo =
-          analysis
-              .getSchemaTree()
-              .searchDeviceSchemaInfo(
-                  insertRowStatement.getDevicePath(),
-                  Arrays.asList(insertRowStatement.getMeasurements()));
-      List<MeasurementSchema> measurementSchemas = deviceSchemaInfo.getMeasurementSchemaList();
       return new InsertRowNode(
           context.getQueryId().genPlanNodeId(),
           insertRowStatement.getDevicePath(),
           insertRowStatement.isAligned(),
-          measurementSchemas.toArray(new MeasurementSchema[0]),
+          insertRowStatement.getMeasurements(),
           insertRowStatement.getDataTypes(),
           insertRowStatement.getTime(),
-          insertRowStatement.getValues());
-    }
-
-    @Override
-    public PlanNode visitShowTimeSeries(
-        ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext context) {
-      QueryPlanBuilder planBuilder = new QueryPlanBuilder(context);
-      planBuilder.planTimeSeriesMetaSource(
-          showTimeSeriesStatement.getPathPattern(),
-          showTimeSeriesStatement.getKey(),
-          showTimeSeriesStatement.getValue(),
-          showTimeSeriesStatement.getLimit(),
-          showTimeSeriesStatement.getOffset(),
-          showTimeSeriesStatement.isOrderByHeat(),
-          showTimeSeriesStatement.isContains(),
-          showTimeSeriesStatement.isPrefixPath());
-      planBuilder.planSchemaMerge(showTimeSeriesStatement.isOrderByHeat());
-      if (showTimeSeriesStatement.getLimit() > 0) {
-        planBuilder.planOffset(showTimeSeriesStatement.getOffset());
-        planBuilder.planLimit(showTimeSeriesStatement.getLimit());
-      }
-      return planBuilder.getRoot();
-    }
-
-    @Override
-    public PlanNode visitShowDevices(
-        ShowDevicesStatement showDevicesStatement, MPPQueryContext context) {
-      QueryPlanBuilder planBuilder = new QueryPlanBuilder(context);
-      planBuilder.planDeviceSchemaSource(
-          showDevicesStatement.getPathPattern(),
-          showDevicesStatement.getLimit(),
-          showDevicesStatement.getOffset(),
-          showDevicesStatement.isPrefixPath(),
-          showDevicesStatement.hasSgCol());
-      planBuilder.planSchemaMerge(false);
-      planBuilder.planOffset(showDevicesStatement.getOffset());
-      planBuilder.planLimit(showDevicesStatement.getLimit());
-      return planBuilder.getRoot();
+          insertRowStatement.getValues(),
+          insertRowStatement.isNeedInferType());
     }
 
     @Override
@@ -345,28 +290,21 @@ public class LogicalPlanner {
     @Override
     public PlanNode visitInsertRows(
         InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
-      // set schema in insert node
       // convert insert statement to insert node
       InsertRowsNode insertRowsNode = new InsertRowsNode(context.getQueryId().genPlanNodeId());
       for (int i = 0; i < insertRowsStatement.getInsertRowStatementList().size(); i++) {
         InsertRowStatement insertRowStatement =
             insertRowsStatement.getInsertRowStatementList().get(i);
-        DeviceSchemaInfo deviceSchemaInfo =
-            analysis
-                .getSchemaTree()
-                .searchDeviceSchemaInfo(
-                    insertRowStatement.getDevicePath(),
-                    Arrays.asList(insertRowStatement.getMeasurements()));
-        List<MeasurementSchema> measurementSchemas = deviceSchemaInfo.getMeasurementSchemaList();
         insertRowsNode.addOneInsertRowNode(
             new InsertRowNode(
                 insertRowsNode.getPlanNodeId(),
                 insertRowStatement.getDevicePath(),
                 insertRowStatement.isAligned(),
-                measurementSchemas.toArray(new MeasurementSchema[0]),
+                insertRowStatement.getMeasurements(),
                 insertRowStatement.getDataTypes(),
                 insertRowStatement.getTime(),
-                insertRowStatement.getValues()),
+                insertRowStatement.getValues(),
+                insertRowStatement.isNeedInferType()),
             i);
       }
       return insertRowsNode;
@@ -375,65 +313,87 @@ public class LogicalPlanner {
     @Override
     public PlanNode visitInsertMultiTablets(
         InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext context) {
-      // set schema in insert node
       // convert insert statement to insert node
       InsertMultiTabletsNode insertMultiTabletsNode =
           new InsertMultiTabletsNode(context.getQueryId().genPlanNodeId());
-      List<InsertTabletNode> insertTabletNodeList = new ArrayList<>();
       for (int i = 0; i < insertMultiTabletsStatement.getInsertTabletStatementList().size(); i++) {
         InsertTabletStatement insertTabletStatement =
             insertMultiTabletsStatement.getInsertTabletStatementList().get(i);
-        DeviceSchemaInfo deviceSchemaInfo =
-            analysis
-                .getSchemaTree()
-                .searchDeviceSchemaInfo(
-                    insertTabletStatement.getDevicePath(),
-                    Arrays.asList(insertTabletStatement.getMeasurements()));
-        List<MeasurementSchema> measurementSchemas = deviceSchemaInfo.getMeasurementSchemaList();
-        insertTabletNodeList.add(
+        insertMultiTabletsNode.addInsertTabletNode(
             new InsertTabletNode(
                 insertMultiTabletsNode.getPlanNodeId(),
                 insertTabletStatement.getDevicePath(),
                 insertTabletStatement.isAligned(),
-                measurementSchemas.toArray(new MeasurementSchema[0]),
+                insertTabletStatement.getMeasurements(),
                 insertTabletStatement.getDataTypes(),
                 insertTabletStatement.getTimes(),
                 insertTabletStatement.getBitMaps(),
                 insertTabletStatement.getColumns(),
-                insertTabletStatement.getRowCount()));
+                insertTabletStatement.getRowCount()),
+            i);
       }
-      insertMultiTabletsNode.setInsertTabletNodeList(insertTabletNodeList);
       return insertMultiTabletsNode;
     }
 
     @Override
     public PlanNode visitInsertRowsOfOneDevice(
         InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, MPPQueryContext context) {
-      // set schema in insert node
       // convert insert statement to insert node
-      InsertRowsNode insertRowsNode = new InsertRowsNode(context.getQueryId().genPlanNodeId());
+      InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode =
+          new InsertRowsOfOneDeviceNode(context.getQueryId().genPlanNodeId());
       for (int i = 0; i < insertRowsOfOneDeviceStatement.getInsertRowStatementList().size(); i++) {
         InsertRowStatement insertRowStatement =
             insertRowsOfOneDeviceStatement.getInsertRowStatementList().get(i);
-        DeviceSchemaInfo deviceSchemaInfo =
-            analysis
-                .getSchemaTree()
-                .searchDeviceSchemaInfo(
-                    insertRowStatement.getDevicePath(),
-                    Arrays.asList(insertRowStatement.getMeasurements()));
-        List<MeasurementSchema> measurementSchemas = deviceSchemaInfo.getMeasurementSchemaList();
-        insertRowsNode.addOneInsertRowNode(
+        insertRowsOfOneDeviceNode.addOneInsertRowNode(
             new InsertRowNode(
-                insertRowsNode.getPlanNodeId(),
+                insertRowsOfOneDeviceNode.getPlanNodeId(),
                 insertRowStatement.getDevicePath(),
                 insertRowStatement.isAligned(),
-                measurementSchemas.toArray(new MeasurementSchema[0]),
+                insertRowStatement.getMeasurements(),
                 insertRowStatement.getDataTypes(),
                 insertRowStatement.getTime(),
-                insertRowStatement.getValues()),
+                insertRowStatement.getValues(),
+                insertRowStatement.isNeedInferType()),
             i);
       }
-      return insertRowsNode;
+      return insertRowsOfOneDeviceNode;
+    }
+
+    @Override
+    public PlanNode visitShowTimeSeries(
+        ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext context) {
+      QueryPlanBuilder planBuilder = new QueryPlanBuilder(context);
+      planBuilder.planTimeSeriesMetaSource(
+          showTimeSeriesStatement.getPathPattern(),
+          showTimeSeriesStatement.getKey(),
+          showTimeSeriesStatement.getValue(),
+          showTimeSeriesStatement.getLimit(),
+          showTimeSeriesStatement.getOffset(),
+          showTimeSeriesStatement.isOrderByHeat(),
+          showTimeSeriesStatement.isContains(),
+          showTimeSeriesStatement.isPrefixPath());
+      planBuilder.planSchemaMerge(showTimeSeriesStatement.isOrderByHeat());
+      if (showTimeSeriesStatement.getLimit() > 0) {
+        planBuilder.planOffset(showTimeSeriesStatement.getOffset());
+        planBuilder.planLimit(showTimeSeriesStatement.getLimit());
+      }
+      return planBuilder.getRoot();
+    }
+
+    @Override
+    public PlanNode visitShowDevices(
+        ShowDevicesStatement showDevicesStatement, MPPQueryContext context) {
+      QueryPlanBuilder planBuilder = new QueryPlanBuilder(context);
+      planBuilder.planDeviceSchemaSource(
+          showDevicesStatement.getPathPattern(),
+          showDevicesStatement.getLimit(),
+          showDevicesStatement.getOffset(),
+          showDevicesStatement.isPrefixPath(),
+          showDevicesStatement.hasSgCol());
+      planBuilder.planSchemaMerge(false);
+      planBuilder.planOffset(showDevicesStatement.getOffset());
+      planBuilder.planLimit(showDevicesStatement.getLimit());
+      return planBuilder.getRoot();
     }
 
     @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
index 2c15674023..49088f418d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -21,11 +21,15 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.mpp.sql.statement.crud.BatchInsert;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
@@ -36,7 +40,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-public class InsertMultiTabletsNode extends InsertNode {
+public class InsertMultiTabletsNode extends InsertNode implements BatchInsert {
 
   /**
    * the value is used to indict the parent InsertTabletNode's index when the parent
@@ -114,6 +118,16 @@ public class InsertMultiTabletsNode extends InsertNode {
     parentInsertTabletNodeIndexList.add(parentIndex);
   }
 
+  @Override
+  public boolean validateSchema(SchemaTree schemaTree) {
+    for (InsertTabletNode insertTabletNode : insertTabletNodeList) {
+      if (!insertTabletNode.validateSchema(schemaTree)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   @Override
   public List<WritePlanNode> splitByPartition(Analysis analysis) {
     Map<TRegionReplicaSet, InsertMultiTabletsNode> splitMap = new HashMap<>();
@@ -177,12 +191,81 @@ public class InsertMultiTabletsNode extends InsertNode {
     return null;
   }
 
+  @Override
+  public List<PartialPath> getDevicePaths() {
+    List<PartialPath> partialPaths = new ArrayList<>();
+    for (InsertTabletNode insertTabletNode : insertTabletNodeList) {
+      partialPaths.add(insertTabletNode.devicePath);
+    }
+    return partialPaths;
+  }
+
+  @Override
+  public List<String[]> getMeasurementsList() {
+    List<String[]> measurementsList = new ArrayList<>();
+    for (InsertTabletNode insertTabletNode : insertTabletNodeList) {
+      measurementsList.add(insertTabletNode.measurements);
+    }
+    return measurementsList;
+  }
+
+  @Override
+  public List<TSDataType[]> getDataTypesList() {
+    List<TSDataType[]> dataTypesList = new ArrayList<>();
+    for (InsertTabletNode insertTabletNode : insertTabletNodeList) {
+      dataTypesList.add(insertTabletNode.dataTypes);
+    }
+    return dataTypesList;
+  }
+
+  @Override
+  public List<Boolean> getAlignedList() {
+    List<Boolean> alignedList = new ArrayList<>();
+    for (InsertTabletNode insertTabletNode : insertTabletNodeList) {
+      alignedList.add(insertTabletNode.isAligned);
+    }
+    return alignedList;
+  }
+
   public static InsertMultiTabletsNode deserialize(ByteBuffer byteBuffer) {
-    return null;
+    PlanNodeId planNodeId;
+    List<InsertTabletNode> insertTabletNodeList = new ArrayList<>();
+    List<Integer> parentIndex = new ArrayList<>();
+
+    int size = byteBuffer.getInt();
+    for (int i = 0; i < size; i++) {
+      InsertTabletNode insertTabletNode = new InsertTabletNode(new PlanNodeId(""));
+      insertTabletNode.subDeserialize(byteBuffer);
+      insertTabletNodeList.add(insertTabletNode);
+    }
+    for (int i = 0; i < size; i++) {
+      parentIndex.add(byteBuffer.getInt());
+    }
+
+    planNodeId = PlanNodeId.deserialize(byteBuffer);
+    for (InsertTabletNode insertTabletNode : insertTabletNodeList) {
+      insertTabletNode.setPlanNodeId(planNodeId);
+    }
+
+    InsertMultiTabletsNode insertMultiTabletsNode = new InsertMultiTabletsNode(planNodeId);
+    insertMultiTabletsNode.setInsertTabletNodeList(insertTabletNodeList);
+    insertMultiTabletsNode.setParentInsertTabletNodeIndexList(parentIndex);
+    return insertMultiTabletsNode;
   }
 
   @Override
-  public void serialize(ByteBuffer byteBuffer) {}
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.INSERT_MULTI_TABLET.serialize(byteBuffer);
+
+    byteBuffer.putInt(insertTabletNodeList.size());
+
+    for (InsertTabletNode node : insertTabletNodeList) {
+      node.subSerialize(byteBuffer);
+    }
+    for (Integer index : parentInsertTabletNodeIndexList) {
+      byteBuffer.putInt(index);
+    }
+  }
 
   @Override
   public boolean equals(Object o) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
index 32520a924b..14b1bf1a23 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
@@ -72,20 +74,13 @@ public abstract class InsertNode extends WritePlanNode {
       PlanNodeId id,
       PartialPath devicePath,
       boolean isAligned,
-      MeasurementSchema[] measurementSchemas,
+      String[] measurements,
       TSDataType[] dataTypes) {
     super(id);
     this.devicePath = devicePath;
     this.isAligned = isAligned;
-    this.measurementSchemas = measurementSchemas;
+    this.measurements = measurements;
     this.dataTypes = dataTypes;
-
-    this.measurements = new String[measurementSchemas.length];
-    for (int i = 0; i < measurementSchemas.length; i++) {
-      if (measurementSchemas[i] != null) {
-        measurements[i] = measurementSchemas[i].getMeasurementId();
-      }
-    }
   }
 
   public TRegionReplicaSet getDataRegionReplicaSet() {
@@ -142,27 +137,23 @@ public abstract class InsertNode extends WritePlanNode {
 
   protected void serializeMeasurementSchemaToWAL(IWALByteBufferView buffer) {
     for (MeasurementSchema measurementSchema : measurementSchemas) {
-      if (measurementSchema != null) {
-        WALWriteUtils.write(measurementSchema, buffer);
-      }
+      WALWriteUtils.write(measurementSchema, buffer);
     }
   }
 
   protected int serializeMeasurementSchemaSize() {
     int byteLen = 0;
     for (MeasurementSchema measurementSchema : measurementSchemas) {
-      if (measurementSchema != null) {
-        byteLen += ReadWriteIOUtils.sizeToWrite(measurementSchema.getMeasurementId());
-        byteLen += 3 * Byte.BYTES;
-        Map<String, String> props = measurementSchema.getProps();
-        if (props == null) {
-          byteLen += Integer.BYTES;
-        } else {
-          byteLen += Integer.BYTES;
-          for (Map.Entry<String, String> entry : props.entrySet()) {
-            byteLen += ReadWriteIOUtils.sizeToWrite(entry.getKey());
-            byteLen += ReadWriteIOUtils.sizeToWrite(entry.getValue());
-          }
+      byteLen += ReadWriteIOUtils.sizeToWrite(measurementSchema.getMeasurementId());
+      byteLen += 3 * Byte.BYTES;
+      Map<String, String> props = measurementSchema.getProps();
+      if (props == null) {
+        byteLen += Integer.BYTES;
+      } else {
+        byteLen += Integer.BYTES;
+        for (Map.Entry<String, String> entry : props.entrySet()) {
+          byteLen += ReadWriteIOUtils.sizeToWrite(entry.getKey());
+          byteLen += ReadWriteIOUtils.sizeToWrite(entry.getValue());
         }
       }
     }
@@ -201,19 +192,37 @@ public abstract class InsertNode extends WritePlanNode {
     return dataRegionReplicaSet;
   }
 
-  @Override
-  protected void serializeAttributes(ByteBuffer byteBuffer) {
-    throw new NotImplementedException("serializeAttributes of InsertNode is not implemented");
+  public abstract boolean validateSchema(SchemaTree schemaTree);
+
+  public void setMeasurementSchemas(SchemaTree schemaTree) {
+    DeviceSchemaInfo deviceSchemaInfo =
+        schemaTree.searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements));
+    measurementSchemas =
+        deviceSchemaInfo.getMeasurementSchemaList().toArray(new MeasurementSchema[0]);
   }
 
-  protected int countFailedMeasurements() {
-    int result = 0;
-    for (MeasurementSchema measurement : measurementSchemas) {
-      if (measurement == null) {
-        result++;
-      }
+  /**
+   * This method is overrided in InsertRowPlan and InsertTabletPlan. After marking failed
+   * measurements, the failed values or columns would be null as well. We'd better use
+   * "measurements[index] == null" to determine if the measurement failed.
+   *
+   * @param index failed measurement index
+   */
+  public void markFailedMeasurementInsertion(int index, Exception e) {
+    // todo partial insert
+    if (measurements[index] == null) {
+      return;
     }
-    return result;
+    //    if (failedMeasurements == null) {
+    //      failedMeasurements = new ArrayList<>();
+    //    }
+    //    failedMeasurements.add(measurements[index]);
+    measurements[index] = null;
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    throw new NotImplementedException("serializeAttributes of InsertNode is not implemented");
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
index 84d91add1e..94654b50fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
@@ -19,16 +19,23 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
 
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.utils.CommonUtils;
 import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.wal.buffer.WALEntryValue;
 import org.apache.iotdb.db.wal.utils.WALWriteUtils;
@@ -53,11 +60,15 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
 
   private static final Logger logger = LoggerFactory.getLogger(InsertRowNode.class);
 
+  private static final byte TYPE_RAW_STRING = -1;
+
   private static final byte TYPE_NULL = -2;
 
   private long time;
   private Object[] values;
 
+  private boolean isNeedInferType = false;
+
   public InsertRowNode(PlanNodeId id) {
     super(id);
   }
@@ -66,13 +77,15 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
       PlanNodeId id,
       PartialPath devicePath,
       boolean isAligned,
-      MeasurementSchema[] measurements,
+      String[] measurements,
       TSDataType[] dataTypes,
       long time,
-      Object[] values) {
+      Object[] values,
+      boolean isNeedInferType) {
     super(id, devicePath, isAligned, measurements, dataTypes);
     this.time = time;
     this.values = values;
+    this.isNeedInferType = isNeedInferType;
   }
 
   @Override
@@ -118,6 +131,131 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
     return null;
   }
 
+  public Object[] getValues() {
+    return values;
+  }
+
+  public void setValues(Object[] values) {
+    this.values = values;
+  }
+
+  public long getTime() {
+    return time;
+  }
+
+  public void setTime(long time) {
+    this.time = time;
+  }
+
+  public boolean isNeedInferType() {
+    return isNeedInferType;
+  }
+
+  public void setNeedInferType(boolean needInferType) {
+    isNeedInferType = needInferType;
+  }
+
+  @Override
+  public boolean validateSchema(SchemaTree schemaTree) {
+    DeviceSchemaInfo deviceSchemaInfo =
+        schemaTree.searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements));
+
+    List<MeasurementSchema> measurementSchemas = deviceSchemaInfo.getMeasurementSchemaList();
+
+    if (isNeedInferType) {
+      try {
+        transferType(measurementSchemas);
+      } catch (QueryProcessException e) {
+        return false;
+      }
+    } else {
+      // todo partial insert
+      if (deviceSchemaInfo.isAligned() != isAligned) {
+        return false;
+      }
+
+      for (int i = 0; i < measurementSchemas.size(); i++) {
+        if (dataTypes[i] != measurementSchemas.get(i).getType()) {
+          if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+            return false;
+          } else {
+            markFailedMeasurementInsertion(
+                i,
+                new DataTypeMismatchException(
+                    devicePath.getFullPath(),
+                    measurements[i],
+                    measurementSchemas.get(i).getType(),
+                    dataTypes[i]));
+          }
+        }
+      }
+    }
+
+    // filter failed measurements
+    measurements = Arrays.stream(measurements).filter(Objects::nonNull).toArray(String[]::new);
+    dataTypes = Arrays.stream(dataTypes).filter(Objects::nonNull).toArray(TSDataType[]::new);
+    values = Arrays.stream(values).filter(Objects::nonNull).toArray(Object[]::new);
+
+    return true;
+  }
+
+  @Override
+  public void markFailedMeasurementInsertion(int index, Exception e) {
+    if (measurements[index] == null) {
+      return;
+    }
+    super.markFailedMeasurementInsertion(index, e);
+    values[index] = null;
+    dataTypes[index] = null;
+  }
+
+  /**
+   * if inferType is true, transfer String[] values to specific data types (Integer, Long, Float,
+   * Double, Binary)
+   */
+  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+  public void transferType(List<MeasurementSchema> measurementSchemas)
+      throws QueryProcessException {
+    if (isNeedInferType) {
+      for (int i = 0; i < measurementSchemas.size(); i++) {
+        if (measurementSchemas.get(i) == null) {
+          if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+            markFailedMeasurementInsertion(
+                i,
+                new QueryProcessException(
+                    new PathNotExistException(
+                        devicePath.getFullPath()
+                            + IoTDBConstant.PATH_SEPARATOR
+                            + measurements[i])));
+          } else {
+            throw new QueryProcessException(
+                new PathNotExistException(
+                    devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i]));
+          }
+          continue;
+        }
+
+        dataTypes[i] = measurementSchemas.get(i).getType();
+        try {
+          values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString());
+        } catch (Exception e) {
+          logger.warn(
+              "{}.{} data type is not consistent, input {}, registered {}",
+              devicePath,
+              measurements[i],
+              values[i],
+              dataTypes[i]);
+          if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+            markFailedMeasurementInsertion(i, e);
+          } else {
+            throw e;
+          }
+        }
+      }
+      isNeedInferType = false;
+    }
+  }
+
   @Override
   public int serializedSize() {
     int size = 0;
@@ -173,33 +311,12 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
     return size;
   }
 
-  @Override
-  public int hashCode() {
-    int result = Objects.hash(super.hashCode(), time);
-    result = 31 * result + Arrays.hashCode(values);
-    return result;
-  }
-
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.INSERT_ROW.serialize(byteBuffer);
     subSerialize(byteBuffer);
   }
 
-  public static InsertRowNode deserialize(ByteBuffer byteBuffer) {
-    // TODO: (xingtanzjr) remove placeholder
-    InsertRowNode insertNode = new InsertRowNode(new PlanNodeId(""));
-    insertNode.setTime(byteBuffer.getLong());
-    try {
-      insertNode.setDevicePath(new PartialPath(ReadWriteIOUtils.readString(byteBuffer)));
-    } catch (IllegalPathException e) {
-      throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
-    }
-    insertNode.deserializeMeasurementsAndValues(byteBuffer);
-    insertNode.setPlanNodeId(PlanNodeId.deserialize(byteBuffer));
-    return insertNode;
-  }
-
   void subSerialize(ByteBuffer buffer) {
     buffer.putLong(time);
     ReadWriteIOUtils.write(devicePath.getFullPath(), buffer);
@@ -207,12 +324,18 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
   }
 
   void serializeMeasurementsAndValues(ByteBuffer buffer) {
-    buffer.putInt(measurementSchemas.length - countFailedMeasurements());
+    buffer.putInt(measurements.length);
 
-    for (MeasurementSchema measurementSchema : measurementSchemas) {
-      if (measurementSchema != null) {
+    // check whether has measurement schemas or not
+    buffer.put((byte) (measurementSchemas != null ? 1 : 0));
+    if (measurementSchemas != null) {
+      for (MeasurementSchema measurementSchema : measurementSchemas) {
         measurementSchema.serializeTo(buffer);
       }
+    } else {
+      for (String measurement : measurements) {
+        ReadWriteIOUtils.write(measurement, buffer);
+      }
     }
 
     try {
@@ -221,16 +344,22 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
       logger.error("Failed to serialize values for {}", this, e);
     }
 
+    buffer.put((byte) (isNeedInferType ? 1 : 0));
     buffer.put((byte) (isAligned ? 1 : 0));
   }
 
   private void putValues(ByteBuffer buffer) throws QueryProcessException {
     for (int i = 0; i < values.length; i++) {
-      if (dataTypes[i] != null) {
-        if (values[i] == null) {
-          ReadWriteIOUtils.write(TYPE_NULL, buffer);
-          continue;
-        }
+      if (values[i] == null) {
+        ReadWriteIOUtils.write(TYPE_NULL, buffer);
+        continue;
+      }
+      // types are not determined, the situation mainly occurs when the plan uses string values
+      // and is forwarded to other nodes
+      if (dataTypes == null || dataTypes[i] == null) {
+        ReadWriteIOUtils.write(TYPE_RAW_STRING, buffer);
+        ReadWriteIOUtils.write(values[i].toString(), buffer);
+      } else {
         ReadWriteIOUtils.write(dataTypes[i], buffer);
         switch (dataTypes[i]) {
           case BOOLEAN:
@@ -258,91 +387,39 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
     }
   }
 
-  @Override
-  public void serializeToWAL(IWALByteBufferView buffer) {
-    buffer.putShort((short) PlanNodeType.INSERT_ROW.ordinal());
-    subSerialize(buffer);
-  }
-
-  void subSerialize(IWALByteBufferView buffer) {
-    buffer.putLong(time);
-    WALWriteUtils.write(devicePath.getFullPath(), buffer);
-    serializeMeasurementsAndValues(buffer);
+  public static InsertRowNode deserialize(ByteBuffer byteBuffer) {
+    // TODO: (xingtanzjr) remove placeholder
+    InsertRowNode insertNode = new InsertRowNode(new PlanNodeId(""));
+    insertNode.subDeserialize(byteBuffer);
+    insertNode.setPlanNodeId(PlanNodeId.deserialize(byteBuffer));
+    return insertNode;
   }
 
-  void serializeMeasurementsAndValues(IWALByteBufferView buffer) {
-    buffer.putInt(measurementSchemas.length - countFailedMeasurements());
-
-    serializeMeasurementSchemaToWAL(buffer);
-
+  public void subDeserialize(ByteBuffer byteBuffer) {
+    time = byteBuffer.getLong();
     try {
-      putValues(buffer);
-    } catch (QueryProcessException e) {
-      logger.error("Failed to serialize values for {}", this, e);
-    }
-
-    buffer.put((byte) (isAligned ? 1 : 0));
-  }
-
-  private void putValues(IWALByteBufferView buffer) throws QueryProcessException {
-    // todo remove serialize datatype after serializing measurement schema
-    for (int i = 0; i < values.length; i++) {
-      if (dataTypes[i] != null) {
-        if (values[i] == null) {
-          WALWriteUtils.write(TYPE_NULL, buffer);
-          continue;
-        }
-        WALWriteUtils.write(dataTypes[i], buffer);
-        switch (dataTypes[i]) {
-          case BOOLEAN:
-            WALWriteUtils.write((Boolean) values[i], buffer);
-            break;
-          case INT32:
-            WALWriteUtils.write((Integer) values[i], buffer);
-            break;
-          case INT64:
-            WALWriteUtils.write((Long) values[i], buffer);
-            break;
-          case FLOAT:
-            WALWriteUtils.write((Float) values[i], buffer);
-            break;
-          case DOUBLE:
-            WALWriteUtils.write((Double) values[i], buffer);
-            break;
-          case TEXT:
-            WALWriteUtils.write((Binary) values[i], buffer);
-            break;
-          default:
-            throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
-        }
-      }
+      devicePath = new PartialPath(ReadWriteIOUtils.readString(byteBuffer));
+    } catch (IllegalPathException e) {
+      throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
     }
-  }
-
-  public Object[] getValues() {
-    return values;
-  }
-
-  public void setValues(Object[] values) {
-    this.values = values;
-  }
-
-  public long getTime() {
-    return time;
-  }
-
-  public void setTime(long time) {
-    this.time = time;
+    deserializeMeasurementsAndValues(byteBuffer);
   }
 
   void deserializeMeasurementsAndValues(ByteBuffer buffer) {
     int measurementSize = buffer.getInt();
 
     this.measurements = new String[measurementSize];
-    this.measurementSchemas = new MeasurementSchema[measurementSize];
-    for (int i = 0; i < measurementSize; i++) {
-      measurementSchemas[i] = MeasurementSchema.deserializeFrom(buffer);
-      measurements[i] = measurementSchemas[i].getMeasurementId();
+    boolean hasSchema = buffer.get() == 1;
+    if (hasSchema) {
+      this.measurementSchemas = new MeasurementSchema[measurementSize];
+      for (int i = 0; i < measurementSize; i++) {
+        measurementSchemas[i] = MeasurementSchema.deserializeFrom(buffer);
+        measurements[i] = measurementSchemas[i].getMeasurementId();
+      }
+    } else {
+      for (int i = 0; i < measurementSize; i++) {
+        measurements[i] = ReadWriteIOUtils.readString(buffer);
+      }
     }
 
     this.dataTypes = new TSDataType[measurementSize];
@@ -353,14 +430,18 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
       e.printStackTrace();
     }
 
+    isNeedInferType = buffer.get() == 1;
     isAligned = buffer.get() == 1;
   }
 
   /** Make sure the values is already inited before calling this */
   public void fillValues(ByteBuffer buffer) throws QueryProcessException {
     for (int i = 0; i < dataTypes.length; i++) {
+      // types are not determined, the situation mainly occurs when the node uses string values
+      // and is forwarded to other nodes
       byte typeNum = (byte) ReadWriteIOUtils.read(buffer);
-      if (typeNum == TYPE_NULL) {
+      if (typeNum == TYPE_RAW_STRING || typeNum == TYPE_NULL) {
+        values[i] = typeNum == TYPE_RAW_STRING ? ReadWriteIOUtils.readString(buffer) : null;
         continue;
       }
       dataTypes[i] = TSDataType.values()[typeNum];
@@ -389,6 +470,65 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
     }
   }
 
+  @Override
+  public void serializeToWAL(IWALByteBufferView buffer) {
+    buffer.putShort((short) PlanNodeType.INSERT_ROW.ordinal());
+    subSerialize(buffer);
+  }
+
+  void subSerialize(IWALByteBufferView buffer) {
+    buffer.putLong(time);
+    WALWriteUtils.write(devicePath.getFullPath(), buffer);
+    serializeMeasurementsAndValues(buffer);
+  }
+
+  void serializeMeasurementsAndValues(IWALByteBufferView buffer) {
+    buffer.putInt(measurementSchemas.length);
+
+    serializeMeasurementSchemaToWAL(buffer);
+
+    try {
+      putValues(buffer);
+    } catch (QueryProcessException e) {
+      logger.error("Failed to serialize values for {}", this, e);
+    }
+
+    buffer.put((byte) (isAligned ? 1 : 0));
+  }
+
+  private void putValues(IWALByteBufferView buffer) throws QueryProcessException {
+    // todo remove serialize datatype after serializing measurement schema
+    for (int i = 0; i < values.length; i++) {
+      if (values[i] == null) {
+        WALWriteUtils.write(TYPE_NULL, buffer);
+        continue;
+      }
+      WALWriteUtils.write(dataTypes[i], buffer);
+      switch (dataTypes[i]) {
+        case BOOLEAN:
+          WALWriteUtils.write((Boolean) values[i], buffer);
+          break;
+        case INT32:
+          WALWriteUtils.write((Integer) values[i], buffer);
+          break;
+        case INT64:
+          WALWriteUtils.write((Long) values[i], buffer);
+          break;
+        case FLOAT:
+          WALWriteUtils.write((Float) values[i], buffer);
+          break;
+        case DOUBLE:
+          WALWriteUtils.write((Double) values[i], buffer);
+          break;
+        case TEXT:
+          WALWriteUtils.write((Binary) values[i], buffer);
+          break;
+        default:
+          throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
+      }
+    }
+  }
+
   public static InsertRowNode deserialize(DataInputStream stream)
       throws IOException, IllegalPathException {
     // This method is used for deserialize from wal
@@ -458,6 +598,15 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
     if (o == null || getClass() != o.getClass()) return false;
     if (!super.equals(o)) return false;
     InsertRowNode that = (InsertRowNode) o;
-    return time == that.time && Arrays.equals(values, that.values);
+    return time == that.time
+        && isNeedInferType == that.isNeedInferType
+        && Arrays.equals(values, that.values);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = Objects.hash(super.hashCode(), time, isNeedInferType);
+    result = 31 * result + Arrays.hashCode(values);
+    return result;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
index 2cf83f8f2e..eccede30d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
@@ -22,11 +22,15 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.mpp.sql.statement.crud.BatchInsert;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
@@ -37,7 +41,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-public class InsertRowsNode extends InsertNode {
+public class InsertRowsNode extends InsertNode implements BatchInsert {
 
   /**
    * Suppose there is an InsertRowsNode, which contains 5 InsertRowNodes,
@@ -100,6 +104,16 @@ public class InsertRowsNode extends InsertNode {
   @Override
   public void addChild(PlanNode child) {}
 
+  @Override
+  public boolean validateSchema(SchemaTree schemaTree) {
+    for (InsertRowNode insertRowNode : insertRowNodeList) {
+      if (!insertRowNode.validateSchema(schemaTree)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
@@ -140,12 +154,81 @@ public class InsertRowsNode extends InsertNode {
     return null;
   }
 
+  @Override
+  public List<PartialPath> getDevicePaths() {
+    List<PartialPath> partialPaths = new ArrayList<>();
+    for (InsertRowNode insertRowNode : insertRowNodeList) {
+      partialPaths.add(insertRowNode.devicePath);
+    }
+    return partialPaths;
+  }
+
+  @Override
+  public List<String[]> getMeasurementsList() {
+    List<String[]> measurementsList = new ArrayList<>();
+    for (InsertRowNode insertRowNode : insertRowNodeList) {
+      measurementsList.add(insertRowNode.measurements);
+    }
+    return measurementsList;
+  }
+
+  @Override
+  public List<TSDataType[]> getDataTypesList() {
+    List<TSDataType[]> dataTypesList = new ArrayList<>();
+    for (InsertRowNode insertRowNode : insertRowNodeList) {
+      dataTypesList.add(insertRowNode.dataTypes);
+    }
+    return dataTypesList;
+  }
+
+  @Override
+  public List<Boolean> getAlignedList() {
+    List<Boolean> alignedList = new ArrayList<>();
+    for (InsertRowNode insertRowNode : insertRowNodeList) {
+      alignedList.add(insertRowNode.isAligned);
+    }
+    return alignedList;
+  }
+
   public static InsertRowsNode deserialize(ByteBuffer byteBuffer) {
-    return null;
+    PlanNodeId planNodeId;
+    List<InsertRowNode> insertRowNodeList = new ArrayList<>();
+    List<Integer> insertRowNodeIndex = new ArrayList<>();
+
+    int size = byteBuffer.getInt();
+    for (int i = 0; i < size; i++) {
+      InsertRowNode insertRowNode = new InsertRowNode(new PlanNodeId(""));
+      insertRowNode.subDeserialize(byteBuffer);
+      insertRowNodeList.add(insertRowNode);
+    }
+    for (int i = 0; i < size; i++) {
+      insertRowNodeIndex.add(byteBuffer.getInt());
+    }
+
+    planNodeId = PlanNodeId.deserialize(byteBuffer);
+    for (InsertRowNode insertRowNode : insertRowNodeList) {
+      insertRowNode.setPlanNodeId(planNodeId);
+    }
+
+    InsertRowsNode insertRowsNode = new InsertRowsNode(planNodeId);
+    insertRowsNode.setInsertRowNodeList(insertRowNodeList);
+    insertRowsNode.setInsertRowNodeIndexList(insertRowNodeIndex);
+    return insertRowsNode;
   }
 
   @Override
-  public void serialize(ByteBuffer byteBuffer) {}
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.INSERT_ROWS.serialize(byteBuffer);
+
+    byteBuffer.putInt(insertRowNodeList.size());
+
+    for (InsertRowNode node : insertRowNodeList) {
+      node.subSerialize(byteBuffer);
+    }
+    for (Integer index : insertRowNodeIndexList) {
+      byteBuffer.putInt(index);
+    }
+  }
 
   @Override
   public List<WritePlanNode> splitByPartition(Analysis analysis) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 16abdd1049..1705d10f38 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -22,13 +22,18 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -121,12 +126,15 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
     return null;
   }
 
-  public static InsertRowsOfOneDeviceNode deserialize(ByteBuffer byteBuffer) {
-    return null;
-  }
-
   @Override
-  public void serialize(ByteBuffer byteBuffer) {}
+  public boolean validateSchema(SchemaTree schemaTree) {
+    for (InsertRowNode insertRowNode : insertRowNodeList) {
+      if (!insertRowNode.validateSchema(schemaTree)) {
+        return false;
+      }
+    }
+    return true;
+  }
 
   @Override
   public List<WritePlanNode> splitByPartition(Analysis analysis) {
@@ -154,6 +162,63 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
     return new ArrayList<>(splitMap.values());
   }
 
+  public void addOneInsertRowNode(InsertRowNode node, int index) {
+    insertRowNodeList.add(node);
+    insertRowNodeIndexList.add(index);
+  }
+
+  public static InsertRowsOfOneDeviceNode deserialize(ByteBuffer byteBuffer) {
+    PartialPath devicePath;
+    PlanNodeId planNodeId;
+    List<InsertRowNode> insertRowNodeList = new ArrayList<>();
+    List<Integer> insertRowNodeIndex = new ArrayList<>();
+
+    try {
+      devicePath = new PartialPath(ReadWriteIOUtils.readString(byteBuffer));
+    } catch (IllegalPathException e) {
+      throw new IllegalArgumentException("Cannot deserialize InsertRowsOfOneDeviceNode", e);
+    }
+
+    int size = byteBuffer.getInt();
+    for (int i = 0; i < size; i++) {
+      InsertRowNode insertRowNode = new InsertRowNode(new PlanNodeId(""));
+      insertRowNode.setDevicePath(devicePath);
+      insertRowNode.setTime(byteBuffer.getLong());
+      insertRowNode.deserializeMeasurementsAndValues(byteBuffer);
+      insertRowNodeList.add(insertRowNode);
+    }
+    for (int i = 0; i < size; i++) {
+      insertRowNodeIndex.add(byteBuffer.getInt());
+    }
+
+    planNodeId = PlanNodeId.deserialize(byteBuffer);
+    for (InsertRowNode insertRowNode : insertRowNodeList) {
+      insertRowNode.setPlanNodeId(planNodeId);
+    }
+
+    InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode = new InsertRowsOfOneDeviceNode(planNodeId);
+    insertRowsOfOneDeviceNode.setInsertRowNodeList(insertRowNodeList);
+    insertRowsOfOneDeviceNode.setInsertRowNodeIndexList(insertRowNodeIndex);
+    insertRowsOfOneDeviceNode.setDevicePath(devicePath);
+    return insertRowsOfOneDeviceNode;
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.INSERT_ROWS_OF_ONE_DEVICE.serialize(byteBuffer);
+    ReadWriteIOUtils.write(devicePath.getFullPath(), byteBuffer);
+
+    byteBuffer.putInt(insertRowNodeList.size());
+
+    for (InsertRowNode node : insertRowNodeList) {
+      byteBuffer.putLong(node.getTime());
+      node.serializeMeasurementsAndValues(byteBuffer);
+    }
+    for (Integer index : insertRowNodeIndexList) {
+      byteBuffer.putInt(index);
+    }
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
index 1b34a0ce04..0b9c307568 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
@@ -20,10 +20,15 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -83,13 +88,13 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
       PlanNodeId id,
       PartialPath devicePath,
       boolean isAligned,
-      MeasurementSchema[] measurementSchemas,
+      String[] measurements,
       TSDataType[] dataTypes,
       long[] times,
       BitMap[] bitMaps,
       Object[] columns,
       int rowCount) {
-    super(id, devicePath, isAligned, measurementSchemas, dataTypes);
+    super(id, devicePath, isAligned, measurements, dataTypes);
     this.times = times;
     this.bitMaps = bitMaps;
     this.columns = columns;
@@ -169,6 +174,148 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
     return null;
   }
 
+  @Override
+  public boolean validateSchema(SchemaTree schemaTree) {
+    DeviceSchemaInfo deviceSchemaInfo =
+        schemaTree.searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements));
+
+    // todo partial insert
+    if (deviceSchemaInfo.isAligned() != isAligned) {
+      return false;
+    }
+
+    List<MeasurementSchema> measurementSchemas = deviceSchemaInfo.getMeasurementSchemaList();
+    for (int i = 0; i < measurementSchemas.size(); i++) {
+      if (dataTypes[i] != measurementSchemas.get(i).getType()) {
+        if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+          return false;
+        } else {
+          markFailedMeasurementInsertion(
+              i,
+              new DataTypeMismatchException(
+                  devicePath.getFullPath(),
+                  measurements[i],
+                  measurementSchemas.get(i).getType(),
+                  dataTypes[i]));
+        }
+      }
+    }
+
+    // filter failed measurements
+    measurements = Arrays.stream(measurements).filter(Objects::nonNull).toArray(String[]::new);
+    dataTypes = Arrays.stream(dataTypes).filter(Objects::nonNull).toArray(TSDataType[]::new);
+    columns = Arrays.stream(columns).filter(Objects::nonNull).toArray(Object[]::new);
+
+    return true;
+  }
+
+  @Override
+  public List<WritePlanNode> splitByPartition(Analysis analysis) {
+    // only single device in single storage group
+    List<WritePlanNode> result = new ArrayList<>();
+    if (times.length == 0) {
+      return Collections.emptyList();
+    }
+    long startTime =
+        (times[0] / StorageEngine.getTimePartitionInterval())
+            * StorageEngine.getTimePartitionInterval(); // included
+    long endTime = startTime + StorageEngine.getTimePartitionInterval(); // excluded
+    TTimePartitionSlot timePartitionSlot = StorageEngineV2.getTimePartitionSlot(times[0]);
+    int startLoc = 0; // included
+
+    List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>();
+    // for each List in split, they are range1.start, range1.end, range2.start, range2.end, ...
+    List<Integer> ranges = new ArrayList<>();
+    for (int i = 1; i < times.length; i++) { // times are sorted in session API.
+      if (times[i] >= endTime) {
+        // a new range.
+        ranges.add(startLoc); // included
+        ranges.add(i); // excluded
+        timePartitionSlots.add(timePartitionSlot);
+        // next init
+        startLoc = i;
+        startTime = endTime;
+        endTime =
+            (times[i] / StorageEngine.getTimePartitionInterval() + 1)
+                * StorageEngine.getTimePartitionInterval();
+        timePartitionSlot = StorageEngineV2.getTimePartitionSlot(times[i]);
+      }
+    }
+
+    // the final range
+    ranges.add(startLoc); // included
+    ranges.add(times.length); // excluded
+    timePartitionSlots.add(timePartitionSlot);
+
+    // data region for each time partition
+    List<TRegionReplicaSet> dataRegionReplicaSets =
+        analysis
+            .getDataPartitionInfo()
+            .getDataRegionReplicaSetForWriting(devicePath.getFullPath(), timePartitionSlots);
+
+    Map<TRegionReplicaSet, List<Integer>> splitMap = new HashMap<>();
+    for (int i = 0; i < dataRegionReplicaSets.size(); i++) {
+      List<Integer> sub_ranges =
+          splitMap.computeIfAbsent(dataRegionReplicaSets.get(i), x -> new ArrayList<>());
+      sub_ranges.add(ranges.get(i));
+      sub_ranges.add(ranges.get(i + 1));
+    }
+
+    List<Integer> locs;
+    for (Map.Entry<TRegionReplicaSet, List<Integer>> entry : splitMap.entrySet()) {
+      // generate a new times and values
+      locs = entry.getValue();
+      int count = 0;
+      for (int i = 0; i < locs.size(); i += 2) {
+        int start = locs.get(i);
+        int end = locs.get(i + 1);
+        count += end - start;
+      }
+      long[] subTimes = new long[count];
+      int destLoc = 0;
+      Object[] values = initTabletValues(dataTypes.length, count, dataTypes);
+      BitMap[] bitMaps = this.bitMaps == null ? null : initBitmaps(dataTypes.length, count);
+      for (int i = 0; i < locs.size(); i += 2) {
+        int start = locs.get(i);
+        int end = locs.get(i + 1);
+        System.arraycopy(times, start, subTimes, destLoc, end - start);
+        for (int k = 0; k < values.length; k++) {
+          System.arraycopy(columns[k], start, values[k], destLoc, end - start);
+          if (bitMaps != null && this.bitMaps[k] != null) {
+            BitMap.copyOfRange(this.bitMaps[k], start, bitMaps[k], destLoc, end - start);
+          }
+        }
+        destLoc += end - start;
+      }
+      InsertTabletNode subNode =
+          new InsertTabletNode(
+              getPlanNodeId(),
+              devicePath,
+              isAligned,
+              measurements,
+              dataTypes,
+              subTimes,
+              bitMaps,
+              values,
+              subTimes.length);
+      subNode.setRange(locs);
+      subNode.setDataRegionReplicaSet(entry.getKey());
+      result.add(subNode);
+    }
+    return result;
+  }
+
+  @Override
+  public void markFailedMeasurementInsertion(int index, Exception e) {
+    if (measurements[index] == null) {
+      return;
+    }
+    super.markFailedMeasurementInsertion(index, e);
+    dataTypes[index] = null;
+    columns[index] = null;
+    bitMaps[index] = null;
+  }
+
   @Override
   public int serializedSize() {
     return serializedSize(0, rowCount);
@@ -188,13 +335,8 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
 
     size += serializeMeasurementSchemaSize();
 
-    // data types size
-    size += Integer.BYTES;
-    for (int i = 0; i < dataTypes.length; i++) {
-      if (measurements[i] != null) {
-        size += Byte.BYTES;
-      }
-    }
+    size += Byte.BYTES * dataTypes.length;
+
     // times size
     size += Integer.BYTES;
     size += Long.BYTES * (end - start);
@@ -217,7 +359,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
         size += getColumnSize(dataTypes[i], columns[i], start, end);
       }
     }
-    size += Long.BYTES;
+
     size += Byte.BYTES;
     return size;
   }
@@ -267,19 +409,24 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
   }
 
   private void writeMeasurements(ByteBuffer buffer) {
-    buffer.putInt(measurementSchemas.length - countFailedMeasurements());
-    for (MeasurementSchema measurement : measurementSchemas) {
-      if (measurement != null) {
+    buffer.putInt(measurements.length);
+
+    // check whether has measurement schemas or not
+    buffer.put((byte) (measurementSchemas != null ? 1 : 0));
+
+    if (measurementSchemas != null) {
+      for (MeasurementSchema measurement : measurementSchemas) {
         measurement.serializeTo(buffer);
       }
+    } else {
+      for (String measurement : measurements) {
+        ReadWriteIOUtils.write(measurement, buffer);
+      }
     }
   }
 
   private void writeDataTypes(ByteBuffer buffer) {
     for (TSDataType dataType : dataTypes) {
-      if (dataType == null) {
-        continue;
-      }
       dataType.serializeTo(buffer);
     }
   }
@@ -295,15 +442,12 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
     buffer.put(BytesUtils.boolToByte(bitMaps != null));
     if (bitMaps != null) {
       for (int i = 0; i < measurements.length; i++) {
-        // check failed measurement
-        if (measurements[i] != null) {
-          BitMap bitMap = bitMaps[i];
-          if (bitMap == null) {
-            buffer.put(BytesUtils.boolToByte(false));
-          } else {
-            buffer.put(BytesUtils.boolToByte(true));
-            buffer.put(bitMap.getByteArray());
-          }
+        BitMap bitMap = bitMaps[i];
+        if (bitMap == null) {
+          buffer.put(BytesUtils.boolToByte(false));
+        } else {
+          buffer.put(BytesUtils.boolToByte(true));
+          buffer.put(bitMap.getByteArray());
         }
       }
     }
@@ -311,9 +455,6 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
 
   private void writeValues(ByteBuffer buffer) {
     for (int i = 0; i < dataTypes.length; i++) {
-      if (columns[i] == null) {
-        continue;
-      }
       serializeColumn(dataTypes[i], columns[i], buffer);
     }
   }
@@ -383,91 +524,12 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
   }
 
   private void writeMeasurements(IWALByteBufferView buffer) {
-    buffer.putInt(measurementSchemas.length - countFailedMeasurements());
+    buffer.putInt(measurementSchemas.length);
     serializeMeasurementSchemaToWAL(buffer);
   }
 
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-    if (!super.equals(o)) return false;
-    InsertTabletNode that = (InsertTabletNode) o;
-    return rowCount == that.rowCount
-        && Arrays.equals(times, that.times)
-        && Arrays.equals(bitMaps, that.bitMaps)
-        && equals(that.columns)
-        && Objects.equals(range, that.range);
-  }
-
-  private boolean equals(Object[] columns) {
-    if (this.columns == columns) {
-      return true;
-    }
-
-    if (columns == null || this.columns == null || columns.length != this.columns.length) {
-      return false;
-    }
-
-    for (int i = 0; i < columns.length; i++) {
-      if (dataTypes[i] != null) {
-        switch (dataTypes[i]) {
-          case INT32:
-            if (!Arrays.equals((int[]) this.columns[i], (int[]) columns[i])) {
-              return false;
-            }
-            break;
-          case INT64:
-            if (!Arrays.equals((long[]) this.columns[i], (long[]) columns[i])) {
-              return false;
-            }
-            break;
-          case FLOAT:
-            if (!Arrays.equals((float[]) this.columns[i], (float[]) columns[i])) {
-              return false;
-            }
-            break;
-          case DOUBLE:
-            if (!Arrays.equals((double[]) this.columns[i], (double[]) columns[i])) {
-              return false;
-            }
-            break;
-          case BOOLEAN:
-            if (!Arrays.equals((boolean[]) this.columns[i], (boolean[]) columns[i])) {
-              return false;
-            }
-            break;
-          case TEXT:
-            if (!Arrays.equals((Binary[]) this.columns[i], (Binary[]) columns[i])) {
-              return false;
-            }
-            break;
-          default:
-            throw new UnSupportedDataTypeException(
-                String.format(DATATYPE_UNSUPPORTED, dataTypes[i]));
-        }
-      } else if (!columns[i].equals(columns)) {
-        return false;
-      }
-    }
-
-    return true;
-  }
-
-  @Override
-  public int hashCode() {
-    int result = Objects.hash(super.hashCode(), rowCount, range);
-    result = 31 * result + Arrays.hashCode(times);
-    result = 31 * result + Arrays.hashCode(bitMaps);
-    result = 31 * result + Arrays.hashCode(columns);
-    return result;
-  }
-
   private void writeDataTypes(IWALByteBufferView buffer) {
     for (TSDataType dataType : dataTypes) {
-      if (dataType == null) {
-        continue;
-      }
       WALWriteUtils.write(dataType, buffer);
     }
   }
@@ -483,18 +545,15 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
     buffer.put(BytesUtils.boolToByte(bitMaps != null));
     if (bitMaps != null) {
       for (int i = 0; i < measurements.length; i++) {
-        // check failed measurement
-        if (measurements[i] != null) {
-          BitMap bitMap = bitMaps[i];
-          if (bitMap == null) {
-            buffer.put(BytesUtils.boolToByte(false));
-          } else {
-            buffer.put(BytesUtils.boolToByte(true));
-            int len = end - start;
-            BitMap partBitMap = new BitMap(len);
-            BitMap.copyOfRange(bitMap, start, partBitMap, 0, len);
-            buffer.put(partBitMap.getByteArray());
-          }
+        BitMap bitMap = bitMaps[i];
+        if (bitMap == null) {
+          buffer.put(BytesUtils.boolToByte(false));
+        } else {
+          buffer.put(BytesUtils.boolToByte(true));
+          int len = end - start;
+          BitMap partBitMap = new BitMap(len);
+          BitMap.copyOfRange(bitMap, start, partBitMap, 0, len);
+          buffer.put(partBitMap.getByteArray());
         }
       }
     }
@@ -502,9 +561,6 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
 
   private void writeValues(IWALByteBufferView buffer, int start, int end) {
     for (int i = 0; i < dataTypes.length; i++) {
-      if (columns[i] == null) {
-        continue;
-      }
       serializeColumn(dataTypes[i], columns[i], buffer, start, end);
     }
   }
@@ -554,102 +610,6 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
     }
   }
 
-  @Override
-  public List<WritePlanNode> splitByPartition(Analysis analysis) {
-    // only single device in single storage group
-    List<WritePlanNode> result = new ArrayList<>();
-    if (times.length == 0) {
-      return Collections.emptyList();
-    }
-    long startTime =
-        (times[0] / StorageEngineV2.getTimePartitionInterval())
-            * StorageEngineV2.getTimePartitionInterval(); // included
-    long endTime = startTime + StorageEngineV2.getTimePartitionInterval(); // excluded
-    TTimePartitionSlot timePartitionSlot = StorageEngineV2.getTimePartitionSlot(times[0]);
-    int startLoc = 0; // included
-
-    List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>();
-    // for each List in split, they are range1.start, range1.end, range2.start, range2.end, ...
-    List<Integer> ranges = new ArrayList<>();
-    for (int i = 1; i < times.length; i++) { // times are sorted in session API.
-      if (times[i] >= endTime) {
-        // a new range.
-        ranges.add(startLoc); // included
-        ranges.add(i); // excluded
-        timePartitionSlots.add(timePartitionSlot);
-        // next init
-        startLoc = i;
-        startTime = endTime;
-        endTime =
-            (times[i] / StorageEngineV2.getTimePartitionInterval() + 1)
-                * StorageEngineV2.getTimePartitionInterval();
-        timePartitionSlot = StorageEngineV2.getTimePartitionSlot(times[i]);
-      }
-    }
-
-    // the final range
-    ranges.add(startLoc); // included
-    ranges.add(times.length); // excluded
-    timePartitionSlots.add(timePartitionSlot);
-
-    // data region for each time partition
-    List<TRegionReplicaSet> dataRegionReplicaSets =
-        analysis
-            .getDataPartitionInfo()
-            .getDataRegionReplicaSetForWriting(devicePath.getFullPath(), timePartitionSlots);
-
-    Map<TRegionReplicaSet, List<Integer>> splitMap = new HashMap<>();
-    for (int i = 0; i < dataRegionReplicaSets.size(); i++) {
-      List<Integer> sub_ranges =
-          splitMap.computeIfAbsent(dataRegionReplicaSets.get(i), x -> new ArrayList<>());
-      sub_ranges.add(ranges.get(i));
-      sub_ranges.add(ranges.get(i + 1));
-    }
-
-    List<Integer> locs;
-    for (Map.Entry<TRegionReplicaSet, List<Integer>> entry : splitMap.entrySet()) {
-      // generate a new times and values
-      locs = entry.getValue();
-      int count = 0;
-      for (int i = 0; i < locs.size(); i += 2) {
-        int start = locs.get(i);
-        int end = locs.get(i + 1);
-        count += end - start;
-      }
-      long[] subTimes = new long[count];
-      int destLoc = 0;
-      Object[] values = initTabletValues(dataTypes.length, count, dataTypes);
-      BitMap[] bitMaps = this.bitMaps == null ? null : initBitmaps(dataTypes.length, count);
-      for (int i = 0; i < locs.size(); i += 2) {
-        int start = locs.get(i);
-        int end = locs.get(i + 1);
-        System.arraycopy(times, start, subTimes, destLoc, end - start);
-        for (int k = 0; k < values.length; k++) {
-          System.arraycopy(columns[k], start, values[k], destLoc, end - start);
-          if (bitMaps != null && this.bitMaps[k] != null) {
-            BitMap.copyOfRange(this.bitMaps[k], start, bitMaps[k], destLoc, end - start);
-          }
-        }
-        destLoc += end - start;
-      }
-      InsertTabletNode subNode =
-          new InsertTabletNode(
-              getPlanNodeId(),
-              devicePath,
-              isAligned,
-              measurementSchemas,
-              dataTypes,
-              subTimes,
-              bitMaps,
-              values,
-              subTimes.length);
-      subNode.setRange(locs);
-      subNode.setDataRegionReplicaSet(entry.getKey());
-      result.add(subNode);
-    }
-    return result;
-  }
-
   private Object[] initTabletValues(int columnSize, int rowSize, TSDataType[] dataTypes) {
     Object[] values = new Object[columnSize];
     for (int i = 0; i < values.length; i++) {
@@ -687,24 +647,33 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
 
   public static InsertTabletNode deserialize(ByteBuffer byteBuffer) {
     InsertTabletNode insertNode = new InsertTabletNode(new PlanNodeId(""));
-    try {
-      insertNode.subDeserialize(byteBuffer);
-    } catch (IllegalPathException e) {
-      throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
-    }
+    insertNode.subDeserialize(byteBuffer);
     insertNode.setPlanNodeId(PlanNodeId.deserialize(byteBuffer));
     return insertNode;
   }
 
-  private void subDeserialize(ByteBuffer buffer) throws IllegalPathException {
-    this.devicePath = new PartialPath(ReadWriteIOUtils.readString(buffer));
+  public void subDeserialize(ByteBuffer buffer) {
+    try {
+      this.devicePath = new PartialPath(ReadWriteIOUtils.readString(buffer));
+    } catch (IllegalPathException e) {
+      throw new IllegalArgumentException("Cannot deserialize InsertTabletNode", e);
+    }
 
     int measurementSize = buffer.getInt();
     this.measurements = new String[measurementSize];
-    this.measurementSchemas = new MeasurementSchema[measurementSize];
-    for (int i = 0; i < measurementSize; i++) {
-      measurementSchemas[i] = MeasurementSchema.deserializeFrom(buffer);
-      measurements[i] = measurementSchemas[i].getMeasurementId();
+
+    boolean hasSchema = buffer.get() == 1;
+
+    if (hasSchema) {
+      this.measurementSchemas = new MeasurementSchema[measurementSize];
+      for (int i = 0; i < measurementSize; i++) {
+        measurementSchemas[i] = MeasurementSchema.deserializeFrom(buffer);
+        measurements[i] = measurementSchemas[i].getMeasurementId();
+      }
+    } else {
+      for (int i = 0; i < measurementSize; i++) {
+        measurements[i] = ReadWriteIOUtils.readString(buffer);
+      }
     }
 
     this.dataTypes = new TSDataType[measurementSize];
@@ -761,4 +730,80 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
         QueryDataSetUtils.readTabletValuesFromStream(stream, dataTypes, measurementSize, rows);
     this.isAligned = stream.readByte() == 1;
   }
+
+  @Override
+  public int hashCode() {
+    int result = Objects.hash(super.hashCode(), rowCount, range);
+    result = 31 * result + Arrays.hashCode(times);
+    result = 31 * result + Arrays.hashCode(bitMaps);
+    result = 31 * result + Arrays.hashCode(columns);
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    if (!super.equals(o)) return false;
+    InsertTabletNode that = (InsertTabletNode) o;
+    return rowCount == that.rowCount
+        && Arrays.equals(times, that.times)
+        && Arrays.equals(bitMaps, that.bitMaps)
+        && equals(that.columns)
+        && Objects.equals(range, that.range);
+  }
+
+  private boolean equals(Object[] columns) {
+    if (this.columns == columns) {
+      return true;
+    }
+
+    if (columns == null || this.columns == null || columns.length != this.columns.length) {
+      return false;
+    }
+
+    for (int i = 0; i < columns.length; i++) {
+      if (dataTypes[i] != null) {
+        switch (dataTypes[i]) {
+          case INT32:
+            if (!Arrays.equals((int[]) this.columns[i], (int[]) columns[i])) {
+              return false;
+            }
+            break;
+          case INT64:
+            if (!Arrays.equals((long[]) this.columns[i], (long[]) columns[i])) {
+              return false;
+            }
+            break;
+          case FLOAT:
+            if (!Arrays.equals((float[]) this.columns[i], (float[]) columns[i])) {
+              return false;
+            }
+            break;
+          case DOUBLE:
+            if (!Arrays.equals((double[]) this.columns[i], (double[]) columns[i])) {
+              return false;
+            }
+            break;
+          case BOOLEAN:
+            if (!Arrays.equals((boolean[]) this.columns[i], (boolean[]) columns[i])) {
+              return false;
+            }
+            break;
+          case TEXT:
+            if (!Arrays.equals((Binary[]) this.columns[i], (Binary[]) columns[i])) {
+              return false;
+            }
+            break;
+          default:
+            throw new UnSupportedDataTypeException(
+                String.format(DATATYPE_UNSUPPORTED, dataTypes[i]));
+        }
+      } else if (!columns[i].equals(columns)) {
+        return false;
+      }
+    }
+
+    return true;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/BatchInsert.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/BatchInsert.java
new file mode 100644
index 0000000000..6592cadd52
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/BatchInsert.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.statement.crud;
+
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.List;
+
+/** BatchInsert contains multiple sub insert. */
+public interface BatchInsert {
+
+  List<PartialPath> getDevicePaths();
+
+  List<String[]> getMeasurementsList();
+
+  List<TSDataType[]> getDataTypesList();
+
+  List<Boolean> getAlignedList();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertBaseStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertBaseStatement.java
index ab54768a17..cd4e9ec890 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertBaseStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertBaseStatement.java
@@ -19,13 +19,9 @@
 package org.apache.iotdb.db.mpp.sql.statement.crud;
 
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
-import java.util.ArrayList;
-import java.util.List;
-
 public abstract class InsertBaseStatement extends Statement {
 
   /**
@@ -40,9 +36,6 @@ public abstract class InsertBaseStatement extends Statement {
   // get from client
   protected TSDataType[] dataTypes;
 
-  // record the failed measurements, their reasons, and positions in "measurements"
-  List<String> failedMeasurements;
-
   public PartialPath getDevicePath() {
     return devicePath;
   }
@@ -74,28 +67,4 @@ public abstract class InsertBaseStatement extends Statement {
   public void setAligned(boolean aligned) {
     isAligned = aligned;
   }
-
-  public List<String> getFailedMeasurements() {
-    return failedMeasurements;
-  }
-
-  public abstract boolean checkDataType(SchemaTree schemaTree);
-
-  /**
-   * This method is overrided in InsertRowPlan and InsertTabletPlan. After marking failed
-   * measurements, the failed values or columns would be null as well. We'd better use
-   * "measurements[index] == null" to determine if the measurement failed.
-   *
-   * @param index failed measurement index
-   */
-  public void markFailedMeasurementInsertion(int index, Exception e) {
-    if (measurements[index] == null) {
-      return;
-    }
-    if (failedMeasurements == null) {
-      failedMeasurements = new ArrayList<>();
-    }
-    failedMeasurements.add(measurements[index]);
-    measurements[index] = null;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertMultiTabletsStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertMultiTabletsStatement.java
index 159fe87743..5b7aa98e9f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertMultiTabletsStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertMultiTabletsStatement.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.mpp.sql.statement.crud;
 
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
@@ -72,16 +71,6 @@ public class InsertMultiTabletsStatement extends InsertBaseStatement {
     return alignedList;
   }
 
-  @Override
-  public boolean checkDataType(SchemaTree schemaTree) {
-    for (InsertTabletStatement insertTabletStatement : insertTabletStatementList) {
-      if (!insertTabletStatement.checkDataType(schemaTree)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
   public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
     return visitor.visitInsertMultiTablets(this, context);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowStatement.java
index 6d2ea03035..72dfed7322 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowStatement.java
@@ -20,30 +20,18 @@
 package org.apache.iotdb.db.mpp.sql.statement.crud;
 
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngineV2;
-import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.db.mpp.sql.constant.StatementType;
 import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
-import org.apache.iotdb.db.utils.CommonUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
 public class InsertRowStatement extends InsertBaseStatement {
-  private static final Logger logger = LoggerFactory.getLogger(InsertRowStatement.class);
 
   private static final byte TYPE_RAW_STRING = -1;
   private static final byte TYPE_NULL = -2;
@@ -117,92 +105,10 @@ public class InsertRowStatement extends InsertBaseStatement {
     }
   }
 
-  /**
-   * if inferType is true, transfer String[] values to specific data types (Integer, Long, Float,
-   * Double, Binary)
-   */
-  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
-  public void transferType(SchemaTree schemaTree) throws QueryProcessException {
-    List<MeasurementSchema> measurementSchemas =
-        schemaTree
-            .searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements))
-            .getMeasurementSchemaList();
-    if (isNeedInferType) {
-      for (int i = 0; i < measurementSchemas.size(); i++) {
-        if (measurementSchemas.get(i) == null) {
-          if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
-            markFailedMeasurementInsertion(
-                i,
-                new QueryProcessException(
-                    new PathNotExistException(
-                        devicePath.getFullPath()
-                            + IoTDBConstant.PATH_SEPARATOR
-                            + measurements[i])));
-          } else {
-            throw new QueryProcessException(
-                new PathNotExistException(
-                    devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i]));
-          }
-          continue;
-        }
-
-        dataTypes[i] = measurementSchemas.get(i).getType();
-        try {
-          values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString());
-        } catch (Exception e) {
-          logger.warn(
-              "{}.{} data type is not consistent, input {}, registered {}",
-              devicePath,
-              measurements[i],
-              values[i],
-              dataTypes[i]);
-          if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
-            markFailedMeasurementInsertion(i, e);
-          } else {
-            throw e;
-          }
-        }
-      }
-    }
-  }
-
-  @Override
-  public void markFailedMeasurementInsertion(int index, Exception e) {
-    if (measurements[index] == null) {
-      return;
-    }
-    super.markFailedMeasurementInsertion(index, e);
-    values[index] = null;
-    dataTypes[index] = null;
-  }
-
   public List<TTimePartitionSlot> getTimePartitionSlots() {
     return Collections.singletonList(StorageEngineV2.getTimePartitionSlot(time));
   }
 
-  public boolean checkDataType(SchemaTree schemaTree) {
-    List<MeasurementSchema> measurementSchemas =
-        schemaTree
-            .searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements))
-            .getMeasurementSchemaList();
-    for (int i = 0; i < measurementSchemas.size(); i++) {
-      if (dataTypes[i] != measurementSchemas.get(i).getType()) {
-        if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
-          return false;
-        } else {
-          markFailedMeasurementInsertion(
-              i,
-              new DataTypeMismatchException(
-                  devicePath.getFullPath(),
-                  measurements[i],
-                  measurementSchemas.get(i).getType(),
-                  dataTypes[i]));
-        }
-      }
-    }
-    return true;
-  }
-
   public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
     return visitor.visitInsertRow(this, context);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowsOfOneDeviceStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowsOfOneDeviceStatement.java
index 438642ad43..71eccaa20d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowsOfOneDeviceStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowsOfOneDeviceStatement.java
@@ -21,8 +21,6 @@ package org.apache.iotdb.db.mpp.sql.statement.crud;
 
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.db.engine.StorageEngineV2;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
@@ -75,22 +73,6 @@ public class InsertRowsOfOneDeviceStatement extends InsertBaseStatement {
     return new ArrayList<>(timePartitionSlotSet);
   }
 
-  @Override
-  public boolean checkDataType(SchemaTree schemaTree) {
-    for (InsertRowStatement insertRowStatement : insertRowStatementList) {
-      if (!insertRowStatement.checkDataType(schemaTree)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  public void transferType(SchemaTree schemaTree) throws QueryProcessException {
-    for (InsertRowStatement insertRowStatement : insertRowStatementList) {
-      insertRowStatement.transferType(schemaTree);
-    }
-  }
-
   public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
     return visitor.visitInsertRowsOfOneDevice(this, context);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowsStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowsStatement.java
index f778d2d7d0..d740a2290f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowsStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowsStatement.java
@@ -19,9 +19,7 @@
 
 package org.apache.iotdb.db.mpp.sql.statement.crud;
 
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
@@ -73,22 +71,6 @@ public class InsertRowsStatement extends InsertBaseStatement {
     this.insertRowStatementList = insertRowStatementList;
   }
 
-  @Override
-  public boolean checkDataType(SchemaTree schemaTree) {
-    for (InsertRowStatement insertRowStatement : insertRowStatementList) {
-      if (!insertRowStatement.checkDataType(schemaTree)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  public void transferType(SchemaTree schemaTree) throws QueryProcessException {
-    for (InsertRowStatement insertRowStatement : insertRowStatementList) {
-      insertRowStatement.transferType(schemaTree);
-    }
-  }
-
   public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
     return visitor.visitInsertRows(this, context);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java
index 1d08cba075..612d2f8463 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java
@@ -19,16 +19,12 @@
 package org.apache.iotdb.db.mpp.sql.statement.crud;
 
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.StorageEngineV2;
-import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
-import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
 import org.apache.iotdb.tsfile.utils.BitMap;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
 public class InsertTabletStatement extends InsertBaseStatement {
@@ -71,31 +67,20 @@ public class InsertTabletStatement extends InsertBaseStatement {
     this.times = times;
   }
 
-  @Override
-  public void markFailedMeasurementInsertion(int index, Exception e) {
-    if (measurements[index] == null) {
-      return;
-    }
-    super.markFailedMeasurementInsertion(index, e);
-    dataTypes[index] = null;
-    columns[index] = null;
-    bitMaps[index] = null;
-  }
-
   public List<TTimePartitionSlot> getTimePartitionSlots() {
     List<TTimePartitionSlot> result = new ArrayList<>();
     long startTime =
-        (times[0] / StorageEngineV2.getTimePartitionInterval())
-            * StorageEngineV2.getTimePartitionInterval(); // included
-    long endTime = startTime + StorageEngineV2.getTimePartitionInterval(); // excluded
+        (times[0] / StorageEngine.getTimePartitionInterval())
+            * StorageEngine.getTimePartitionInterval(); // included
+    long endTime = startTime + StorageEngine.getTimePartitionInterval(); // excluded
     TTimePartitionSlot timePartitionSlot = StorageEngineV2.getTimePartitionSlot(times[0]);
     for (int i = 1; i < times.length; i++) { // times are sorted in session API.
       if (times[i] >= endTime) {
         result.add(timePartitionSlot);
         // next init
         endTime =
-            (times[i] / StorageEngineV2.getTimePartitionInterval() + 1)
-                * StorageEngineV2.getTimePartitionInterval();
+            (times[i] / StorageEngine.getTimePartitionInterval() + 1)
+                * StorageEngine.getTimePartitionInterval();
         timePartitionSlot = StorageEngineV2.getTimePartitionSlot(times[i]);
       }
     }
@@ -103,30 +88,6 @@ public class InsertTabletStatement extends InsertBaseStatement {
     return result;
   }
 
-  @Override
-  public boolean checkDataType(SchemaTree schemaTree) {
-    List<MeasurementSchema> measurementSchemas =
-        schemaTree
-            .searchDeviceSchemaInfo(devicePath, Arrays.asList(measurements))
-            .getMeasurementSchemaList();
-    for (int i = 0; i < measurementSchemas.size(); i++) {
-      if (dataTypes[i] != measurementSchemas.get(i).getType()) {
-        if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
-          return false;
-        } else {
-          markFailedMeasurementInsertion(
-              i,
-              new DataTypeMismatchException(
-                  devicePath.getFullPath(),
-                  measurements[i],
-                  measurementSchemas.get(i).getType(),
-                  dataTypes[i]));
-        }
-      }
-    }
-    return true;
-  }
-
   public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
     return visitor.visitInsertTablet(this, context);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index eb77ccb6cd..493b8466a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -37,12 +37,18 @@ import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.exception.DataRegionException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceInfo;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.db.mpp.sql.analyze.SchemaValidator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.mpp.rpc.thrift.InternalService;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelPlanFragmentReq;
@@ -93,9 +99,22 @@ public class InternalServiceImpl implements InternalService.Iface {
         return new TSendFragmentInstanceResp(!info.getState().isFailed());
       case WRITE:
         TSendFragmentInstanceResp response = new TSendFragmentInstanceResp();
-        ConsensusWriteResponse resp =
-            ConsensusImpl.getInstance()
-                .write(groupId, new ByteBufferConsensusRequest(req.fragmentInstance.body));
+        ConsensusWriteResponse resp;
+
+        FragmentInstance fragmentInstance =
+            FragmentInstance.deserializeFrom(req.fragmentInstance.body);
+        PlanNode planNode = fragmentInstance.getFragment().getRoot();
+        if (planNode instanceof InsertNode) {
+          try {
+            SchemaTree schemaTree = SchemaValidator.validate((InsertNode) planNode);
+            ((InsertNode) planNode).setMeasurementSchemas(schemaTree);
+          } catch (SemanticException e) {
+            response.setAccepted(false);
+            response.setMessage(e.getMessage());
+            return response;
+          }
+        }
+        resp = ConsensusImpl.getInstance().write(groupId, fragmentInstance);
         // TODO need consider more status
         response.setAccepted(
             TSStatusCode.SUCCESS_STATUS.getStatusCode() == resp.getStatus().getCode());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
index a49e05acbd..1f043fb1bf 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
@@ -117,14 +117,18 @@ public class DataRegionTest {
       values[i] = record.dataPointList.get(i).getValue();
     }
     QueryId queryId = new QueryId("test_write");
-    return new InsertRowNode(
-        queryId.genPlanNodeId(),
-        new PartialPath(record.deviceId),
-        false,
-        measurementSchemas,
-        dataTypes,
-        record.time,
-        values);
+    InsertRowNode insertRowNode =
+        new InsertRowNode(
+            queryId.genPlanNodeId(),
+            new PartialPath(record.deviceId),
+            false,
+            measurements,
+            dataTypes,
+            record.time,
+            values,
+            false);
+    insertRowNode.setMeasurementSchemas(measurementSchemas);
+    return insertRowNode;
   }
 
   @Test
@@ -276,12 +280,13 @@ public class DataRegionTest {
             new QueryId("test_write").genPlanNodeId(),
             new PartialPath("root.vehicle.d0"),
             false,
-            measurementSchemas,
+            measurements,
             dataTypes,
             times,
             null,
             columns,
             times.length);
+    insertTabletNode1.setMeasurementSchemas(measurementSchemas);
 
     dataRegion.insertTablet(insertTabletNode1);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -297,12 +302,13 @@ public class DataRegionTest {
             new QueryId("test_write").genPlanNodeId(),
             new PartialPath("root.vehicle.d0"),
             false,
-            measurementSchemas,
+            measurements,
             dataTypes,
             times,
             null,
             columns,
             times.length);
+    insertTabletNode2.setMeasurementSchemas(measurementSchemas);
 
     dataRegion.insertTablet(insertTabletNode2);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -444,12 +450,13 @@ public class DataRegionTest {
             new QueryId("test_write").genPlanNodeId(),
             new PartialPath("root.vehicle.d0"),
             false,
-            measurementSchemas,
+            measurements,
             dataTypes,
             times,
             null,
             columns,
             times.length);
+    insertTabletNode1.setMeasurementSchemas(measurementSchemas);
 
     dataRegion.insertTablet(insertTabletNode1);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -464,12 +471,13 @@ public class DataRegionTest {
             new QueryId("test_write").genPlanNodeId(),
             new PartialPath("root.vehicle.d0"),
             false,
-            measurementSchemas,
+            measurements,
             dataTypes,
             times,
             null,
             columns,
             times.length);
+    insertTabletNode2.setMeasurementSchemas(measurementSchemas);
 
     dataRegion.insertTablet(insertTabletNode2);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -534,12 +542,13 @@ public class DataRegionTest {
             new QueryId("test_write").genPlanNodeId(),
             new PartialPath("root.vehicle.d0"),
             false,
-            measurementSchemas,
+            measurements,
             dataTypes,
             times,
             null,
             columns,
             times.length);
+    insertTabletNode1.setMeasurementSchemas(measurementSchemas);
 
     dataRegion.insertTablet(insertTabletNode1);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -554,12 +563,13 @@ public class DataRegionTest {
             new QueryId("test_write").genPlanNodeId(),
             new PartialPath("root.vehicle.d0"),
             false,
-            measurementSchemas,
+            measurements,
             dataTypes,
             times,
             null,
             columns,
             times.length);
+    insertTabletNode2.setMeasurementSchemas(measurementSchemas);
 
     dataRegion.insertTablet(insertTabletNode2);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -624,12 +634,13 @@ public class DataRegionTest {
             new QueryId("test_write").genPlanNodeId(),
             new PartialPath("root.vehicle.d0"),
             false,
-            measurementSchemas,
+            measurements,
             dataTypes,
             times,
             null,
             columns,
             times.length);
+    insertTabletNode1.setMeasurementSchemas(measurementSchemas);
 
     dataRegion.insertTablet(insertTabletNode1);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -644,12 +655,13 @@ public class DataRegionTest {
             new QueryId("test_write").genPlanNodeId(),
             new PartialPath("root.vehicle.d0"),
             false,
-            measurementSchemas,
+            measurements,
             dataTypes,
             times,
             null,
             columns,
             times.length);
+    insertTabletNode2.setMeasurementSchemas(measurementSchemas);
 
     dataRegion.insertTablet(insertTabletNode2);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java
index f97bf51030..999fb8990f 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java
@@ -463,15 +463,20 @@ public class TsFileProcessorV2Test {
         ((long[]) columns[i])[(int) r] = r;
       }
     }
-    return new InsertTabletNode(
-        new QueryId("test_write").genPlanNodeId(),
-        new PartialPath(deviceId),
-        isAligned,
-        schemas,
-        dataTypes,
-        times,
-        null,
-        columns,
-        times.length);
+
+    InsertTabletNode insertTabletNode =
+        new InsertTabletNode(
+            new QueryId("test_write").genPlanNodeId(),
+            new PartialPath(deviceId),
+            isAligned,
+            measurements,
+            dataTypes,
+            times,
+            null,
+            columns,
+            times.length);
+    insertTabletNode.setMeasurementSchemas(schemas);
+
+    return insertTabletNode;
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index 63e0e59836..0eef8c3f02 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -313,12 +313,17 @@ public class DistributionPlannerTest {
             queryId.genPlanNodeId(),
             new PartialPath("root.sg.d1"),
             false,
-            new MeasurementSchema[] {
-              new MeasurementSchema("s1", TSDataType.INT32),
+            new String[] {
+              "s1",
             },
             new TSDataType[] {TSDataType.INT32},
             1L,
-            new Object[] {10});
+            new Object[] {10},
+            false);
+    insertRowNode.setMeasurementSchemas(
+        new MeasurementSchema[] {
+          new MeasurementSchema("s1", TSDataType.INT32),
+        });
 
     Analysis analysis = constructAnalysis();
 
@@ -340,24 +345,30 @@ public class DistributionPlannerTest {
             queryId.genPlanNodeId(),
             new PartialPath("root.sg.d1"),
             false,
-            new MeasurementSchema[] {
-              new MeasurementSchema("s1", TSDataType.INT32),
-            },
+            new String[] {"s1"},
             new TSDataType[] {TSDataType.INT32},
             1L,
-            new Object[] {10});
+            new Object[] {10},
+            false);
+    insertRowNode1.setMeasurementSchemas(
+        new MeasurementSchema[] {
+          new MeasurementSchema("s1", TSDataType.INT32),
+        });
 
     InsertRowNode insertRowNode2 =
         new InsertRowNode(
             queryId.genPlanNodeId(),
             new PartialPath("root.sg.d1"),
             false,
-            new MeasurementSchema[] {
-              new MeasurementSchema("s1", TSDataType.INT32),
-            },
+            new String[] {"s1"},
             new TSDataType[] {TSDataType.INT32},
             100000L,
-            new Object[] {10});
+            new Object[] {10},
+            false);
+    insertRowNode2.setMeasurementSchemas(
+        new MeasurementSchema[] {
+          new MeasurementSchema("s1", TSDataType.INT32),
+        });
 
     InsertRowsNode node = new InsertRowsNode(queryId.genPlanNodeId());
     node.setInsertRowNodeList(Arrays.asList(insertRowNode1, insertRowNode2));
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertMultiTabletsNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertMultiTabletsNodeSerdeTest.java
new file mode 100644
index 0000000000..a34b0eb68e
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertMultiTabletsNodeSerdeTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.plan.node.write;
+
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class InsertMultiTabletsNodeSerdeTest {
+
+  @Test
+  public void testInsertMultiTabletPlan()
+      throws QueryProcessException, MetadataException, InterruptedException,
+          QueryFilterOptimizationException, StorageEngineException, IOException {
+    long[] times = new long[] {110L, 111L, 112L, 113L};
+    TSDataType[] dataTypes =
+        new TSDataType[] {
+          TSDataType.DOUBLE,
+          TSDataType.FLOAT,
+          TSDataType.INT64,
+          TSDataType.INT32,
+          TSDataType.BOOLEAN,
+          TSDataType.TEXT
+        };
+
+    Object[] columns = new Object[6];
+    columns[0] = new double[4];
+    columns[1] = new float[4];
+    columns[2] = new long[4];
+    columns[3] = new int[4];
+    columns[4] = new boolean[4];
+    columns[5] = new Binary[4];
+
+    for (int r = 0; r < 4; r++) {
+      ((double[]) columns[0])[r] = 1.0;
+      ((float[]) columns[1])[r] = 2;
+      ((long[]) columns[2])[r] = 10000;
+      ((int[]) columns[3])[r] = 100;
+      ((boolean[]) columns[4])[r] = false;
+      ((Binary[]) columns[5])[r] = new Binary("hh" + r);
+    }
+
+    PlanNodeId planNodeId = new PlanNodeId("plan node 1");
+
+    InsertMultiTabletsNode insertMultiTabletsNode = new InsertMultiTabletsNode(planNodeId);
+
+    for (int i = 0; i < 10; i++) {
+      insertMultiTabletsNode.addInsertTabletNode(
+          new InsertTabletNode(
+              planNodeId,
+              new PartialPath("root.multi.d" + i),
+              false,
+              new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+              dataTypes,
+              times,
+              null,
+              columns,
+              times.length),
+          i);
+    }
+
+    ByteBuffer byteBuffer = ByteBuffer.allocate(10000);
+    insertMultiTabletsNode.serialize(byteBuffer);
+    byteBuffer.flip();
+
+    Assert.assertEquals(PlanNodeType.INSERT_MULTI_TABLET.ordinal(), byteBuffer.getShort());
+
+    Assert.assertEquals(InsertMultiTabletsNode.deserialize(byteBuffer), insertMultiTabletsNode);
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowNodeSerdeTest.java
index 03cc69769c..229fbd0722 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowNodeSerdeTest.java
@@ -24,12 +24,16 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.wal.utils.WALByteBufferForTest;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 public class InsertRowNodeSerdeTest {
@@ -46,18 +50,50 @@ public class InsertRowNodeSerdeTest {
 
     Assert.assertEquals(InsertRowNode.deserialize(byteBuffer), insertRowNode);
 
-    // Test with failed column
-    insertRowNode = getInsertRowNodeWithFailedColumn();
+    insertRowNode = getInsertRowNodeWithMeasurementSchemas();
     byteBuffer = ByteBuffer.allocate(10000);
     insertRowNode.serialize(byteBuffer);
     byteBuffer.flip();
 
     Assert.assertEquals(PlanNodeType.INSERT_ROW.ordinal(), byteBuffer.getShort());
 
-    InsertRowNode tmpNode = InsertRowNode.deserialize(byteBuffer);
+    Assert.assertEquals(InsertRowNode.deserialize(byteBuffer), insertRowNode);
+
+    insertRowNode = getInsertRowNodeWithStringValue();
+    byteBuffer = ByteBuffer.allocate(10000);
+    insertRowNode.serialize(byteBuffer);
+    byteBuffer.flip();
+
+    Assert.assertEquals(PlanNodeType.INSERT_ROW.ordinal(), byteBuffer.getShort());
+
+    Assert.assertEquals(InsertRowNode.deserialize(byteBuffer), insertRowNode);
+  }
+
+  @Test
+  public void TestSerializeAndDeserializeForWAL() throws IllegalPathException, IOException {
+    InsertRowNode insertRowNode = getInsertRowNodeWithMeasurementSchemas();
+
+    int serializedSize = insertRowNode.serializedSize();
+
+    Assert.assertEquals(serializedSize, 125);
+
+    byte[] bytes = new byte[serializedSize];
+    WALByteBufferForTest walBuffer = new WALByteBufferForTest(ByteBuffer.wrap(bytes));
+
+    insertRowNode.serializeToWAL(walBuffer);
+
+    DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes));
+
+    Assert.assertEquals(PlanNodeType.INSERT_ROW.ordinal(), dataInputStream.readShort());
+
+    InsertRowNode tmpNode = InsertRowNode.deserialize(dataInputStream);
 
     Assert.assertEquals(tmpNode.getTime(), insertRowNode.getTime());
-    Assert.assertEquals(tmpNode.getMeasurements(), new String[] {"s1", "s3", "s5"});
+    Assert.assertEquals(tmpNode.getDevicePath(), insertRowNode.getDevicePath());
+    Assert.assertEquals(tmpNode.isAligned(), insertRowNode.isAligned());
+    Assert.assertArrayEquals(tmpNode.getValues(), insertRowNode.getValues());
+    Assert.assertArrayEquals(
+        tmpNode.getMeasurementSchemas(), insertRowNode.getMeasurementSchemas());
   }
 
   private InsertRowNode getInsertRowNode() throws IllegalPathException {
@@ -82,45 +118,83 @@ public class InsertRowNodeSerdeTest {
         new PlanNodeId("plannode 1"),
         new PartialPath("root.isp.d1"),
         false,
+        new String[] {"s1", "s2", "s3", "s4", "s5"},
+        dataTypes,
+        time,
+        columns,
+        false);
+  }
+
+  private InsertRowNode getInsertRowNodeWithMeasurementSchemas() throws IllegalPathException {
+    long time = 80L;
+    TSDataType[] dataTypes =
+        new TSDataType[] {
+          TSDataType.DOUBLE,
+          TSDataType.FLOAT,
+          TSDataType.INT64,
+          TSDataType.INT32,
+          TSDataType.BOOLEAN,
+        };
+
+    Object[] columns = new Object[5];
+    columns[0] = 5.0;
+    columns[1] = 6.0f;
+    columns[2] = 1000l;
+    columns[3] = 10;
+    columns[4] = true;
+
+    InsertRowNode insertRowNode =
+        new InsertRowNode(
+            new PlanNodeId("plannode 2"),
+            new PartialPath("root.isp.d2"),
+            false,
+            new String[] {"s1", "s2", "s3", "s4", "s5"},
+            dataTypes,
+            time,
+            columns,
+            false);
+
+    insertRowNode.setMeasurementSchemas(
         new MeasurementSchema[] {
           new MeasurementSchema("s1", TSDataType.DOUBLE),
           new MeasurementSchema("s2", TSDataType.FLOAT),
           new MeasurementSchema("s3", TSDataType.INT64),
           new MeasurementSchema("s4", TSDataType.INT32),
           new MeasurementSchema("s5", TSDataType.BOOLEAN)
-        },
-        dataTypes,
-        time,
-        columns);
+        });
+
+    return insertRowNode;
   }
 
-  private InsertRowNode getInsertRowNodeWithFailedColumn() throws IllegalPathException {
+  private InsertRowNode getInsertRowNodeWithStringValue() throws IllegalPathException {
     long time = 110L;
     TSDataType[] dataTypes =
         new TSDataType[] {
-          TSDataType.DOUBLE, null, TSDataType.INT64, null, TSDataType.BOOLEAN,
+          TSDataType.DOUBLE,
+          TSDataType.FLOAT,
+          TSDataType.INT64,
+          TSDataType.INT32,
+          TSDataType.BOOLEAN,
         };
 
     Object[] columns = new Object[5];
-    columns[0] = 1.0;
-    columns[1] = null;
-    columns[2] = 10000l;
-    columns[3] = null;
-    columns[4] = false;
-
-    return new InsertRowNode(
-        new PlanNodeId("plannode 1"),
-        new PartialPath("root.isp.d1"),
-        false,
-        new MeasurementSchema[] {
-          new MeasurementSchema("s1", TSDataType.DOUBLE),
-          null,
-          new MeasurementSchema("s3", TSDataType.INT64),
-          null,
-          new MeasurementSchema("s5", TSDataType.BOOLEAN)
-        },
-        dataTypes,
-        time,
-        columns);
+    columns[0] = "1.0";
+    columns[1] = "2.0";
+    columns[2] = "10000";
+    columns[3] = "100";
+    columns[4] = "false";
+
+    InsertRowNode insertRowNode =
+        new InsertRowNode(
+            new PlanNodeId("plannode 1"),
+            new PartialPath("root.isp.d1"),
+            false,
+            new String[] {"s1", "s2", "s3", "s4", "s5"},
+            new TSDataType[5],
+            time,
+            columns,
+            false);
+    insertRowNode.setNeedInferType(true);
+    return insertRowNode;
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowsNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowsNodeSerdeTest.java
new file mode 100644
index 0000000000..99f949e532
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowsNodeSerdeTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.plan.node.write;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowsNode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class InsertRowsNodeSerdeTest {
+
+  @Test
+  public void TestSerializeAndDeserialize() throws IllegalPathException {
+    InsertRowsNode node = new InsertRowsNode(new PlanNodeId("plan node 1"));
+    node.addOneInsertRowNode(
+        new InsertRowNode(
+            new PlanNodeId("plan node 1"),
+            new PartialPath("root.sg.d1"),
+            false,
+            new String[] {"s1", "s2", "s3"},
+            new TSDataType[] {TSDataType.DOUBLE, TSDataType.FLOAT, TSDataType.INT64},
+            1000L,
+            new Object[] {1.0, 2f, 300L},
+            false),
+        0);
+
+    node.addOneInsertRowNode(
+        new InsertRowNode(
+            new PlanNodeId("plan node 1"),
+            new PartialPath("root.sg.d2"),
+            false,
+            new String[] {"s1", "s4"},
+            new TSDataType[] {TSDataType.DOUBLE, TSDataType.BOOLEAN},
+            2000L,
+            new Object[] {2.0, false},
+            false),
+        1);
+
+    ByteBuffer byteBuffer = ByteBuffer.allocate(10000);
+    node.serialize(byteBuffer);
+    byteBuffer.flip();
+
+    Assert.assertEquals(PlanNodeType.INSERT_ROWS.ordinal(), byteBuffer.getShort());
+
+    Assert.assertEquals(InsertRowsNode.deserialize(byteBuffer), node);
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java
new file mode 100644
index 0000000000..a93cf246a0
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.plan.node.write;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowsOfOneDeviceNode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class InsertRowsOfOneDeviceNodeSerdeTest {
+
+  @Test
+  public void TestSerializeAndDeserialize() throws IllegalPathException {
+    PartialPath device = new PartialPath("root.sg.d");
+    InsertRowsOfOneDeviceNode node = new InsertRowsOfOneDeviceNode(new PlanNodeId("plan node 1"));
+    node.setDevicePath(device);
+    node.addOneInsertRowNode(
+        new InsertRowNode(
+            new PlanNodeId("plan node 1"),
+            device,
+            false,
+            new String[] {"s1", "s2", "s3"},
+            new TSDataType[] {TSDataType.DOUBLE, TSDataType.FLOAT, TSDataType.INT64},
+            1000L,
+            new Object[] {1.0, 2f, 300L},
+            false),
+        0);
+
+    node.addOneInsertRowNode(
+        new InsertRowNode(
+            new PlanNodeId("plan node 1"),
+            device,
+            false,
+            new String[] {"s1", "s4"},
+            new TSDataType[] {TSDataType.DOUBLE, TSDataType.BOOLEAN},
+            2000L,
+            new Object[] {2.0, false},
+            false),
+        1);
+
+    ByteBuffer byteBuffer = ByteBuffer.allocate(10000);
+    node.serialize(byteBuffer);
+    byteBuffer.flip();
+
+    Assert.assertEquals(PlanNodeType.INSERT_ROWS_OF_ONE_DEVICE.ordinal(), byteBuffer.getShort());
+
+    Assert.assertEquals(InsertRowsOfOneDeviceNode.deserialize(byteBuffer), node);
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertTabletNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertTabletNodeSerdeTest.java
index 46abaec468..390ca019bc 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertTabletNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/write/InsertTabletNodeSerdeTest.java
@@ -24,12 +24,16 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.wal.utils.WALByteBufferForTest;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 public class InsertTabletNodeSerdeTest {
@@ -45,6 +49,43 @@ public class InsertTabletNodeSerdeTest {
     Assert.assertEquals(PlanNodeType.INSERT_TABLET.ordinal(), byteBuffer.getShort());
 
     Assert.assertEquals(InsertTabletNode.deserialize(byteBuffer), insertTabletNode);
+
+    insertTabletNode = getInsertTabletNodeWithSchema();
+    byteBuffer = ByteBuffer.allocate(10000);
+    insertTabletNode.serialize(byteBuffer);
+    byteBuffer.flip();
+
+    Assert.assertEquals(PlanNodeType.INSERT_TABLET.ordinal(), byteBuffer.getShort());
+
+    Assert.assertEquals(InsertTabletNode.deserialize(byteBuffer), insertTabletNode);
+  }
+
+  @Test
+  public void TestSerializeAndDeserializeForWAL() throws IllegalPathException, IOException {
+    InsertTabletNode insertTabletNode = getInsertTabletNodeWithSchema();
+
+    int serializedSize = insertTabletNode.serializedSize();
+
+    Assert.assertEquals(229, serializedSize);
+
+    byte[] bytes = new byte[serializedSize];
+    WALByteBufferForTest walBuffer = new WALByteBufferForTest(ByteBuffer.wrap(bytes));
+
+    insertTabletNode.serializeToWAL(walBuffer);
+
+    DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes));
+
+    Assert.assertEquals(PlanNodeType.INSERT_TABLET.ordinal(), dataInputStream.readShort());
+
+    InsertTabletNode tmpNode = InsertTabletNode.deserialize(dataInputStream);
+
+    Assert.assertArrayEquals(tmpNode.getTimes(), insertTabletNode.getTimes());
+    Assert.assertEquals(tmpNode.getDevicePath(), insertTabletNode.getDevicePath());
+    Assert.assertEquals(tmpNode.isAligned(), insertTabletNode.isAligned());
+    Assert.assertArrayEquals(tmpNode.getColumns(), insertTabletNode.getColumns());
+    Assert.assertArrayEquals(tmpNode.getBitMaps(), insertTabletNode.getBitMaps());
+    Assert.assertArrayEquals(
+        tmpNode.getMeasurementSchemas(), insertTabletNode.getMeasurementSchemas());
   }
 
   private InsertTabletNode getInsertTabletNode() throws IllegalPathException {
@@ -76,13 +117,7 @@ public class InsertTabletNodeSerdeTest {
             new PlanNodeId("plannode 1"),
             new PartialPath("root.isp.d1"),
             false,
-            new MeasurementSchema[] {
-              new MeasurementSchema("s1", TSDataType.DOUBLE),
-              new MeasurementSchema("s2", TSDataType.FLOAT),
-              new MeasurementSchema("s3", TSDataType.INT64),
-              new MeasurementSchema("s4", TSDataType.INT32),
-              new MeasurementSchema("s5", TSDataType.BOOLEAN)
-            },
+            new String[] {"s1", "s2", "s3", "s4", "s5"},
             dataTypes,
             times,
             null,
@@ -91,4 +126,51 @@ public class InsertTabletNodeSerdeTest {
 
     return tabletNode;
   }
+
+  private InsertTabletNode getInsertTabletNodeWithSchema() throws IllegalPathException {
+    long[] times = new long[] {110L, 111L, 112L, 113L};
+    TSDataType[] dataTypes = new TSDataType[5];
+    dataTypes[0] = TSDataType.DOUBLE;
+    dataTypes[1] = TSDataType.FLOAT;
+    dataTypes[2] = TSDataType.INT64;
+    dataTypes[3] = TSDataType.INT32;
+    dataTypes[4] = TSDataType.BOOLEAN;
+
+    Object[] columns = new Object[5];
+    columns[0] = new double[4];
+    columns[1] = new float[4];
+    columns[2] = new long[4];
+    columns[3] = new int[4];
+    columns[4] = new boolean[4];
+
+    for (int r = 0; r < 4; r++) {
+      ((double[]) columns[0])[r] = 1.0;
+      ((float[]) columns[1])[r] = 2;
+      ((long[]) columns[2])[r] = 10000;
+      ((int[]) columns[3])[r] = 100;
+      ((boolean[]) columns[4])[r] = false;
+    }
+
+    InsertTabletNode insertTabletNode =
+        new InsertTabletNode(
+            new PlanNodeId("plannode 1"),
+            new PartialPath("root.isp.d1"),
+            false,
+            new String[] {"s1", "s2", "s3", "s4", "s5"},
+            dataTypes,
+            times,
+            null,
+            columns,
+            times.length);
+    insertTabletNode.setMeasurementSchemas(
+        new MeasurementSchema[] {
+          new MeasurementSchema("s1", TSDataType.DOUBLE),
+          new MeasurementSchema("s2", TSDataType.FLOAT),
+          new MeasurementSchema("s3", TSDataType.INT64),
+          new MeasurementSchema("s4", TSDataType.INT32),
+          new MeasurementSchema("s5", TSDataType.BOOLEAN)
+        });
+
+    return insertTabletNode;
+  }
 }