You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/05/18 02:48:08 UTC

[iotdb] branch master updated: Support logic view - move schema validation to analyze phase (#9767)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ef10a20a3e Support logic view - move schema validation to analyze phase (#9767)
ef10a20a3e is described below

commit ef10a20a3e5df126674787fd08208cb1ff06c5a2
Author: Haonan <hh...@outlook.com>
AuthorDate: Thu May 18 10:48:02 2023 +0800

    Support logic view - move schema validation to analyze phase (#9767)
---
 .../execution/executor/RegionWriteExecutor.java    |  40 +----
 .../apache/iotdb/db/mpp/plan/analyze/Analysis.java |  16 +-
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  92 +++++++++--
 .../mpp/plan/analyze/schema/SchemaValidator.java   |  27 +--
 .../db/mpp/plan/execution/QueryExecution.java      |  10 +-
 .../db/mpp/plan/planner/LogicalPlanVisitor.java    |  70 +++++---
 .../planner/plan/node/write/BatchInsertNode.java   |  33 ----
 .../plan/node/write/InsertMultiTabletsNode.java    |  22 +--
 .../plan/planner/plan/node/write/InsertNode.java   | 126 +-------------
 .../planner/plan/node/write/InsertRowNode.java     | 171 +++----------------
 .../planner/plan/node/write/InsertRowsNode.java    |  33 +---
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |  32 +---
 .../planner/plan/node/write/InsertTabletNode.java  | 145 +++-------------
 .../scheduler/FragmentInstanceDispatcherImpl.java  |   5 +-
 .../plan/statement/crud/InsertBaseStatement.java   | 157 +++++++++++++++++-
 .../crud/InsertMultiTabletsStatement.java          |  30 ++++
 .../plan/statement/crud/InsertRowStatement.java    | 183 ++++++++++++++++++++-
 .../crud/InsertRowsOfOneDeviceStatement.java       |  42 +++++
 .../plan/statement/crud/InsertRowsStatement.java   |  41 +++++
 .../plan/statement/crud/InsertTabletStatement.java | 155 ++++++++++++++++-
 .../db/wal/recover/file/TsFilePlanRedoer.java      |   4 -
 .../db/engine/storagegroup/DataRegionTest.java     |  16 +-
 .../db/mpp/plan/analyze/FakeSchemaFetcherImpl.java |   7 +-
 .../org/apache/iotdb/db/wal/io/WALFileTest.java    |  25 ++-
 .../iotdb/db/wal/node/ConsensusReqReaderTest.java  |  27 ++-
 .../org/apache/iotdb/db/wal/node/WALNodeTest.java  |  26 ++-
 .../db/wal/recover/file/TsFilePlanRedoerTest.java  |  32 ++--
 .../file/UnsealedTsFileRecoverPerformerTest.java   |   5 +-
 28 files changed, 883 insertions(+), 689 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
index ab6f614cf5..059cfbc65b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
-import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
@@ -37,12 +36,10 @@ import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.exception.metadata.MeasurementAlreadyExistException;
 import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
-import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
 import org.apache.iotdb.db.metadata.template.Template;
-import org.apache.iotdb.db.mpp.plan.analyze.schema.SchemaValidator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.ActivateTemplateNode;
@@ -226,36 +223,8 @@ public class RegionWriteExecutor {
     private RegionExecutionResult executeDataInsert(
         InsertNode insertNode, WritePlanNodeExecutionContext context) {
       RegionExecutionResult response = new RegionExecutionResult();
-      // data insertion should be blocked by data deletion, especially when deleting timeseries
-      final long startTime = System.nanoTime();
       context.getRegionWriteValidationRWLock().readLock().lock();
       try {
-        try {
-          SchemaValidator.validate(insertNode);
-        } catch (SemanticException e) {
-          response.setAccepted(false);
-          response.setMessage(e.getMessage());
-          if (e.getCause() instanceof IoTDBException) {
-            IoTDBException ioTDBException = (IoTDBException) e.getCause();
-            response.setStatus(
-                RpcUtils.getStatus(ioTDBException.getErrorCode(), ioTDBException.getMessage()));
-          } else {
-            response.setStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage()));
-          }
-          return response;
-        } finally {
-          PERFORMANCE_OVERVIEW_METRICS.recordScheduleSchemaValidateCost(
-              System.nanoTime() - startTime);
-        }
-        boolean hasFailedMeasurement = insertNode.hasFailedMeasurements();
-        String partialInsertMessage = null;
-        if (hasFailedMeasurement) {
-          partialInsertMessage =
-              String.format(
-                  "Fail to insert measurements %s caused by %s",
-                  insertNode.getFailedMeasurements(), insertNode.getFailedMessages());
-          LOGGER.warn(partialInsertMessage);
-        }
 
         ConsensusWriteResponse writeResponse =
             fireTriggerAndInsert(context.getRegionId(), insertNode);
@@ -263,17 +232,10 @@ public class RegionWriteExecutor {
         // TODO need consider more status
         if (writeResponse.getStatus() != null) {
           response.setAccepted(
-              !hasFailedMeasurement
-                  && TSStatusCode.SUCCESS_STATUS.getStatusCode()
-                      == writeResponse.getStatus().getCode());
+              TSStatusCode.SUCCESS_STATUS.getStatusCode() == writeResponse.getStatus().getCode());
           if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != writeResponse.getStatus().getCode()) {
             response.setMessage(writeResponse.getStatus().message);
             response.setStatus(writeResponse.getStatus());
-          } else if (hasFailedMeasurement) {
-            response.setMessage(partialInsertMessage);
-            response.setStatus(
-                RpcUtils.getStatus(
-                    TSStatusCode.METADATA_ERROR.getStatusCode(), partialInsertMessage));
           } else {
             response.setMessage(writeResponse.getStatus().message);
           }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
index 6b3f4780a6..3ff74d6398 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.plan.analyze;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSchemaNode;
 import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.partition.SchemaPartition;
@@ -80,9 +81,10 @@ public class Analysis {
 
   private boolean finishQueryAfterAnalyze;
 
-  // potential fail message when finishQueryAfterAnalyze is true. If failMessage is NULL, means no
+  // potential fail status when finishQueryAfterAnalyze is true. If failStatus is NULL, means no
   // fail.
-  private String failMessage;
+
+  private TSStatus failStatus;
 
   /////////////////////////////////////////////////////////////////////////////////////////////////
   // Query Analysis (used in ALIGN BY TIME)
@@ -407,15 +409,15 @@ public class Analysis {
   }
 
   public boolean isFailed() {
-    return failMessage != null;
+    return failStatus != null;
   }
 
-  public String getFailMessage() {
-    return failMessage;
+  public TSStatus getFailStatus() {
+    return this.failStatus;
   }
 
-  public void setFailMessage(String failMessage) {
-    this.failMessage = failMessage;
+  public void setFailStatus(TSStatus status) {
+    this.failStatus = status;
   }
 
   public void setDeviceViewInputIndexesMap(Map<String, List<Integer>> deviceViewInputIndexesMap) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index a5c7bafe36..7313a600df 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
@@ -31,6 +32,7 @@ import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.service.metric.enums.PerformanceOverviewMetrics;
 import org.apache.iotdb.confignode.rpc.thrift.TGetDataNodeLocationsResp;
 import org.apache.iotdb.db.client.ConfigNodeClient;
 import org.apache.iotdb.db.client.ConfigNodeClientManager;
@@ -92,6 +94,7 @@ import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
 import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
 import org.apache.iotdb.db.mpp.plan.statement.component.WhereCondition;
 import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertBaseStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -137,6 +140,7 @@ import org.apache.iotdb.db.mpp.plan.statement.sys.sync.ShowPipeSinkTypeStatement
 import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.utils.FileLoaderUtils;
 import org.apache.iotdb.db.utils.TimePartitionUtils;
+import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
@@ -206,6 +210,9 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
   private final IPartitionFetcher partitionFetcher;
   private final ISchemaFetcher schemaFetcher;
 
+  private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
+      PerformanceOverviewMetrics.getInstance();
+
   public AnalyzeVisitor(IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
     this.partitionFetcher = partitionFetcher;
     this.schemaFetcher = schemaFetcher;
@@ -2031,32 +2038,48 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
   public Analysis visitInsertTablet(
       InsertTabletStatement insertTabletStatement, MPPQueryContext context) {
     context.setQueryType(QueryType.WRITE);
+    Analysis analysis = new Analysis();
+    analysis.setStatement(insertTabletStatement);
+    validateSchema(analysis, insertTabletStatement);
+    if (analysis.isFinishQueryAfterAnalyze()) {
+      return analysis;
+    }
 
     DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
     dataPartitionQueryParam.setDevicePath(insertTabletStatement.getDevicePath().getFullPath());
     dataPartitionQueryParam.setTimePartitionSlotList(insertTabletStatement.getTimePartitionSlots());
 
-    return getAnalysisForWriting(
-        insertTabletStatement, Collections.singletonList(dataPartitionQueryParam));
+    return getAnalysisForWriting(analysis, Collections.singletonList(dataPartitionQueryParam));
   }
 
   @Override
   public Analysis visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryContext context) {
     context.setQueryType(QueryType.WRITE);
+    Analysis analysis = new Analysis();
+    analysis.setStatement(insertRowStatement);
+    validateSchema(analysis, insertRowStatement);
+    if (analysis.isFinishQueryAfterAnalyze()) {
+      return analysis;
+    }
 
     DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
     dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath());
     dataPartitionQueryParam.setTimePartitionSlotList(
         Collections.singletonList(insertRowStatement.getTimePartitionSlot()));
 
-    return getAnalysisForWriting(
-        insertRowStatement, Collections.singletonList(dataPartitionQueryParam));
+    return getAnalysisForWriting(analysis, Collections.singletonList(dataPartitionQueryParam));
   }
 
   @Override
   public Analysis visitInsertRows(
       InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
     context.setQueryType(QueryType.WRITE);
+    Analysis analysis = new Analysis();
+    analysis.setStatement(insertRowsStatement);
+    validateSchema(analysis, insertRowsStatement);
+    if (analysis.isFinishQueryAfterAnalyze()) {
+      return analysis;
+    }
 
     Map<String, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new HashMap<>();
     for (InsertRowStatement insertRowStatement : insertRowsStatement.getInsertRowStatementList()) {
@@ -2074,13 +2097,19 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
       dataPartitionQueryParams.add(dataPartitionQueryParam);
     }
 
-    return getAnalysisForWriting(insertRowsStatement, dataPartitionQueryParams);
+    return getAnalysisForWriting(analysis, dataPartitionQueryParams);
   }
 
   @Override
   public Analysis visitInsertMultiTablets(
       InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext context) {
     context.setQueryType(QueryType.WRITE);
+    Analysis analysis = new Analysis();
+    analysis.setStatement(insertMultiTabletsStatement);
+    validateSchema(analysis, insertMultiTabletsStatement);
+    if (analysis.isFinishQueryAfterAnalyze()) {
+      return analysis;
+    }
 
     Map<String, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new HashMap<>();
     for (InsertTabletStatement insertTabletStatement :
@@ -2099,13 +2128,19 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
       dataPartitionQueryParams.add(dataPartitionQueryParam);
     }
 
-    return getAnalysisForWriting(insertMultiTabletsStatement, dataPartitionQueryParams);
+    return getAnalysisForWriting(analysis, dataPartitionQueryParams);
   }
 
   @Override
   public Analysis visitInsertRowsOfOneDevice(
       InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, MPPQueryContext context) {
     context.setQueryType(QueryType.WRITE);
+    Analysis analysis = new Analysis();
+    analysis.setStatement(insertRowsOfOneDeviceStatement);
+    validateSchema(analysis, insertRowsOfOneDeviceStatement);
+    if (analysis.isFinishQueryAfterAnalyze()) {
+      return analysis;
+    }
 
     DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
     dataPartitionQueryParam.setDevicePath(
@@ -2113,8 +2148,37 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     dataPartitionQueryParam.setTimePartitionSlotList(
         insertRowsOfOneDeviceStatement.getTimePartitionSlots());
 
-    return getAnalysisForWriting(
-        insertRowsOfOneDeviceStatement, Collections.singletonList(dataPartitionQueryParam));
+    return getAnalysisForWriting(analysis, Collections.singletonList(dataPartitionQueryParam));
+  }
+
+  private void validateSchema(Analysis analysis, InsertBaseStatement insertStatement) {
+    final long startTime = System.nanoTime();
+    try {
+      SchemaValidator.validate(schemaFetcher, insertStatement);
+    } catch (SemanticException e) {
+      analysis.setFinishQueryAfterAnalyze(true);
+      if (e.getCause() instanceof IoTDBException) {
+        IoTDBException exception = (IoTDBException) e.getCause();
+        analysis.setFailStatus(
+            RpcUtils.getStatus(exception.getErrorCode(), exception.getMessage()));
+      } else {
+        analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage()));
+      }
+      return;
+    } finally {
+      PERFORMANCE_OVERVIEW_METRICS.recordScheduleSchemaValidateCost(System.nanoTime() - startTime);
+    }
+    boolean hasFailedMeasurement = insertStatement.hasFailedMeasurements();
+    String partialInsertMessage;
+    if (hasFailedMeasurement) {
+      partialInsertMessage =
+          String.format(
+              "Fail to insert measurements %s caused by %s",
+              insertStatement.getFailedMeasurements(), insertStatement.getFailedMessages());
+      logger.warn(partialInsertMessage);
+      analysis.setFailStatus(
+          RpcUtils.getStatus(TSStatusCode.METADATA_ERROR.getStatusCode(), partialInsertMessage));
+    }
   }
 
   @Override
@@ -2199,16 +2263,17 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
 
   /** get analysis according to statement and params */
   private Analysis getAnalysisForWriting(
-      Statement statement, List<DataPartitionQueryParam> dataPartitionQueryParams) {
-    Analysis analysis = new Analysis();
-    analysis.setStatement(statement);
+      Analysis analysis, List<DataPartitionQueryParam> dataPartitionQueryParams) {
 
     DataPartition dataPartition =
         partitionFetcher.getOrCreateDataPartition(dataPartitionQueryParams);
     if (dataPartition.isEmpty()) {
       analysis.setFinishQueryAfterAnalyze(true);
-      analysis.setFailMessage(
-          "Database not exists and failed to create automatically because enable_auto_create_schema is FALSE.");
+      analysis.setFailStatus(
+          RpcUtils.getStatus(
+              TSStatusCode.DATABASE_NOT_EXIST.getStatusCode(),
+              "Database not exists and failed to create automatically "
+                  + "because enable_auto_create_schema is FALSE."));
     }
     analysis.setDataPartitionInfo(dataPartition);
     return analysis;
@@ -2359,6 +2424,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     }
 
     return SchemaValidator.validate(
+        schemaFetcher,
         deviceList,
         measurementList,
         dataTypeList,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
index 6cbd587a00..5005b9f7a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/SchemaValidator.java
@@ -23,8 +23,10 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.BatchInsertNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertBaseStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -33,30 +35,31 @@ import java.util.List;
 
 public class SchemaValidator {
 
-  private static final ISchemaFetcher SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
-
-  public static void validate(InsertNode insertNode) {
+  public static void validate(ISchemaFetcher schemaFetcher, InsertBaseStatement insertStatement) {
     try {
-      if (insertNode instanceof BatchInsertNode) {
-        SCHEMA_FETCHER.fetchAndComputeSchemaWithAutoCreate(
-            ((BatchInsertNode) insertNode).getSchemaValidationList());
+      if (insertStatement instanceof InsertRowsStatement
+          || insertStatement instanceof InsertMultiTabletsStatement
+          || insertStatement instanceof InsertRowsOfOneDeviceStatement) {
+        schemaFetcher.fetchAndComputeSchemaWithAutoCreate(
+            insertStatement.getSchemaValidationList());
       } else {
-        SCHEMA_FETCHER.fetchAndComputeSchemaWithAutoCreate(insertNode.getSchemaValidation());
+        schemaFetcher.fetchAndComputeSchemaWithAutoCreate(insertStatement.getSchemaValidation());
       }
-      insertNode.updateAfterSchemaValidation();
+      insertStatement.updateAfterSchemaValidation();
     } catch (QueryProcessException e) {
-      throw new SemanticException(e);
+      throw new SemanticException(e.getMessage());
     }
   }
 
   public static ISchemaTree validate(
+      ISchemaFetcher schemaFetcher,
       List<PartialPath> devicePaths,
       List<String[]> measurements,
       List<TSDataType[]> dataTypes,
       List<TSEncoding[]> encodings,
       List<CompressionType[]> compressionTypes,
       List<Boolean> isAlignedList) {
-    return SCHEMA_FETCHER.fetchSchemaListWithAutoCreate(
+    return schemaFetcher.fetchSchemaListWithAutoCreate(
         devicePaths, measurements, dataTypes, encodings, compressionTypes, isAlignedList);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 715cf29c71..e6bcdeb728 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -202,7 +202,7 @@ public class QueryExecution implements IQueryExecution {
     if (skipExecute()) {
       logger.debug("[SkipExecute]");
       if (context.getQueryType() == QueryType.WRITE && analysis.isFailed()) {
-        stateMachine.transitionToFailed(new RuntimeException(analysis.getFailMessage()));
+        stateMachine.transitionToFailed(analysis.getFailStatus());
       } else {
         constructResultForMemorySource();
         stateMachine.transitionToRunning();
@@ -224,6 +224,14 @@ public class QueryExecution implements IQueryExecution {
     }
     PERFORMANCE_OVERVIEW_METRICS.recordPlanCost(System.nanoTime() - startTime);
     schedule();
+
+    // set partial insert error message
+    // When some columns in one insert failed, other column will continue executing insertion.
+    // The error message should be return to client, therefore we need to set it after the insertion
+    // of other column finished.
+    if (context.getQueryType() == QueryType.WRITE && analysis.isFailed()) {
+      stateMachine.transitionToFailed(analysis.getFailStatus());
+    }
   }
 
   private void checkTimeOutForQuery() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index 1d02ae17c5..ed82734b76 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -445,30 +445,38 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
   public PlanNode visitInsertTablet(
       InsertTabletStatement insertTabletStatement, MPPQueryContext context) {
     // convert insert statement to insert node
-    return new InsertTabletNode(
-        context.getQueryId().genPlanNodeId(),
-        insertTabletStatement.getDevicePath(),
-        insertTabletStatement.isAligned(),
-        insertTabletStatement.getMeasurements(),
-        insertTabletStatement.getDataTypes(),
-        insertTabletStatement.getTimes(),
-        insertTabletStatement.getBitMaps(),
-        insertTabletStatement.getColumns(),
-        insertTabletStatement.getRowCount());
+    InsertTabletNode insertNode =
+        new InsertTabletNode(
+            context.getQueryId().genPlanNodeId(),
+            insertTabletStatement.getDevicePath(),
+            insertTabletStatement.isAligned(),
+            insertTabletStatement.getMeasurements(),
+            insertTabletStatement.getDataTypes(),
+            insertTabletStatement.getMeasurementSchemas(),
+            insertTabletStatement.getTimes(),
+            insertTabletStatement.getBitMaps(),
+            insertTabletStatement.getColumns(),
+            insertTabletStatement.getRowCount());
+    insertNode.setFailedMeasurementNumber(insertTabletStatement.getFailedMeasurementNumber());
+    return insertNode;
   }
 
   @Override
   public PlanNode visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryContext context) {
     // convert insert statement to insert node
-    return new InsertRowNode(
-        context.getQueryId().genPlanNodeId(),
-        insertRowStatement.getDevicePath(),
-        insertRowStatement.isAligned(),
-        insertRowStatement.getMeasurements(),
-        insertRowStatement.getDataTypes(),
-        insertRowStatement.getTime(),
-        insertRowStatement.getValues(),
-        insertRowStatement.isNeedInferType());
+    InsertRowNode insertNode =
+        new InsertRowNode(
+            context.getQueryId().genPlanNodeId(),
+            insertRowStatement.getDevicePath(),
+            insertRowStatement.isAligned(),
+            insertRowStatement.getMeasurements(),
+            insertRowStatement.getDataTypes(),
+            insertRowStatement.getMeasurementSchemas(),
+            insertRowStatement.getTime(),
+            insertRowStatement.getValues(),
+            insertRowStatement.isNeedInferType());
+    insertNode.setFailedMeasurementNumber(insertRowStatement.getFailedMeasurementNumber());
+    return insertNode;
   }
 
   @Override
@@ -629,17 +637,19 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
     for (int i = 0; i < insertRowsStatement.getInsertRowStatementList().size(); i++) {
       InsertRowStatement insertRowStatement =
           insertRowsStatement.getInsertRowStatementList().get(i);
-      insertRowsNode.addOneInsertRowNode(
+      InsertRowNode insertRowNode =
           new InsertRowNode(
               insertRowsNode.getPlanNodeId(),
               insertRowStatement.getDevicePath(),
               insertRowStatement.isAligned(),
               insertRowStatement.getMeasurements(),
               insertRowStatement.getDataTypes(),
+              insertRowStatement.getMeasurementSchemas(),
               insertRowStatement.getTime(),
               insertRowStatement.getValues(),
-              insertRowStatement.isNeedInferType()),
-          i);
+              insertRowStatement.isNeedInferType());
+      insertRowNode.setFailedMeasurementNumber(insertRowStatement.getFailedMeasurementNumber());
+      insertRowsNode.addOneInsertRowNode(insertRowNode, i);
     }
     return insertRowsNode;
   }
@@ -653,18 +663,21 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
     for (int i = 0; i < insertMultiTabletsStatement.getInsertTabletStatementList().size(); i++) {
       InsertTabletStatement insertTabletStatement =
           insertMultiTabletsStatement.getInsertTabletStatementList().get(i);
-      insertMultiTabletsNode.addInsertTabletNode(
+      InsertTabletNode insertTabletNode =
           new InsertTabletNode(
               insertMultiTabletsNode.getPlanNodeId(),
               insertTabletStatement.getDevicePath(),
               insertTabletStatement.isAligned(),
               insertTabletStatement.getMeasurements(),
               insertTabletStatement.getDataTypes(),
+              insertTabletStatement.getMeasurementSchemas(),
               insertTabletStatement.getTimes(),
               insertTabletStatement.getBitMaps(),
               insertTabletStatement.getColumns(),
-              insertTabletStatement.getRowCount()),
-          i);
+              insertTabletStatement.getRowCount());
+      insertTabletNode.setFailedMeasurementNumber(
+          insertTabletStatement.getFailedMeasurementNumber());
+      insertMultiTabletsNode.addInsertTabletNode(insertTabletNode, i);
     }
     return insertMultiTabletsNode;
   }
@@ -681,16 +694,19 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
     for (int i = 0; i < insertRowsOfOneDeviceStatement.getInsertRowStatementList().size(); i++) {
       InsertRowStatement insertRowStatement =
           insertRowsOfOneDeviceStatement.getInsertRowStatementList().get(i);
-      insertRowNodeList.add(
+      InsertRowNode insertRowNode =
           new InsertRowNode(
               insertRowsOfOneDeviceNode.getPlanNodeId(),
               insertRowStatement.getDevicePath(),
               insertRowStatement.isAligned(),
               insertRowStatement.getMeasurements(),
               insertRowStatement.getDataTypes(),
+              insertRowStatement.getMeasurementSchemas(),
               insertRowStatement.getTime(),
               insertRowStatement.getValues(),
-              insertRowStatement.isNeedInferType()));
+              insertRowStatement.isNeedInferType());
+      insertRowNode.setFailedMeasurementNumber(insertRowStatement.getFailedMeasurementNumber());
+      insertRowNodeList.add(insertRowNode);
       insertRowNodeIndexList.add(i);
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/BatchInsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/BatchInsertNode.java
deleted file mode 100644
index a0c774d5c4..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/BatchInsertNode.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
-
-import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
-
-import java.util.List;
-
-/**
- * BatchInsertNode contains multiple sub insert. Insert node which contains multiple sub insert
- * nodes needs to implement it.
- */
-public interface BatchInsertNode {
-
-  List<ISchemaValidation> getSchemaValidationList();
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index d0b46b103a..5a4e198b77 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -22,14 +22,12 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
-import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataOutputStream;
@@ -40,9 +38,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.stream.Collectors;
 
-public class InsertMultiTabletsNode extends InsertNode implements BatchInsertNode {
+public class InsertMultiTabletsNode extends InsertNode {
 
   /**
    * the value is used to indict the parent InsertTabletNode's index when the parent
@@ -135,11 +132,6 @@ public class InsertMultiTabletsNode extends InsertNode implements BatchInsertNod
     insertTabletNodeList.forEach(plan -> plan.setSearchIndex(index));
   }
 
-  @Override
-  protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
-    return false;
-  }
-
   @Override
   public List<WritePlanNode> splitByPartition(Analysis analysis) {
     Map<TRegionReplicaSet, InsertMultiTabletsNode> splitMap = new HashMap<>();
@@ -193,13 +185,6 @@ public class InsertMultiTabletsNode extends InsertNode implements BatchInsertNod
     return null;
   }
 
-  @Override
-  public List<ISchemaValidation> getSchemaValidationList() {
-    return insertTabletNodeList.stream()
-        .map(InsertTabletNode::getSchemaValidation)
-        .collect(Collectors.toList());
-  }
-
   public static InsertMultiTabletsNode deserialize(ByteBuffer byteBuffer) {
     PlanNodeId planNodeId;
     List<InsertTabletNode> insertTabletNodeList = new ArrayList<>();
@@ -278,9 +263,4 @@ public class InsertMultiTabletsNode extends InsertNode implements BatchInsertNod
   public long getMinTime() {
     throw new NotImplementedException();
   }
-
-  @Override
-  public Object getFirstValueOfIndex(int index) {
-    throw new NotImplementedException();
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index 07c4b86c06..c191f03837 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -20,13 +20,10 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
-import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
@@ -41,11 +38,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
 import java.util.Objects;
-import java.util.stream.Collectors;
 
 public abstract class InsertNode extends WritePlanNode {
 
@@ -62,11 +55,8 @@ public abstract class InsertNode extends WritePlanNode {
   protected MeasurementSchema[] measurementSchemas;
   protected String[] measurements;
   protected TSDataType[] dataTypes;
-  // TODO(INSERT) need to change it to a function handle to update last time value
-  //  protected IMeasurementMNode[] measurementMNodes;
 
-  /** index of failed measurements -> info including measurement, data type and value */
-  protected Map<Integer, FailedMeasurementInfo> failedMeasurementIndex2Info;
+  protected int failedMeasurementNumber = 0;
 
   /**
    * device id reference, for reuse device id in both id table and memtable <br>
@@ -242,56 +232,8 @@ public abstract class InsertNode extends WritePlanNode {
     return dataRegionReplicaSet;
   }
 
-  public ISchemaValidation getSchemaValidation() {
-    throw new UnsupportedOperationException();
-  }
-
-  public void updateAfterSchemaValidation() throws QueryProcessException {}
-
-  /** Check whether data types are matched with measurement schemas */
-  protected void selfCheckDataTypes(int index)
-      throws DataTypeMismatchException, PathNotExistException {
-    if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
-      // if enable partial insert, mark failed measurements with exception
-      if (measurementSchemas[index] == null) {
-        markFailedMeasurement(
-            index,
-            new PathNotExistException(devicePath.concatNode(measurements[index]).getFullPath()));
-      } else if ((dataTypes[index] != measurementSchemas[index].getType()
-          && !checkAndCastDataType(index, measurementSchemas[index].getType()))) {
-        markFailedMeasurement(
-            index,
-            new DataTypeMismatchException(
-                devicePath.getFullPath(),
-                measurements[index],
-                dataTypes[index],
-                measurementSchemas[index].getType(),
-                getMinTime(),
-                getFirstValueOfIndex(index)));
-      }
-    } else {
-      // if not enable partial insert, throw the exception directly
-      if (measurementSchemas[index] == null) {
-        throw new PathNotExistException(devicePath.concatNode(measurements[index]).getFullPath());
-      } else if ((dataTypes[index] != measurementSchemas[index].getType()
-          && !checkAndCastDataType(index, measurementSchemas[index].getType()))) {
-        throw new DataTypeMismatchException(
-            devicePath.getFullPath(),
-            measurements[index],
-            dataTypes[index],
-            measurementSchemas[index].getType(),
-            getMinTime(),
-            getFirstValueOfIndex(index));
-      }
-    }
-  }
-
-  protected abstract boolean checkAndCastDataType(int columnIndex, TSDataType dataType);
-
   public abstract long getMinTime();
 
-  public abstract Object getFirstValueOfIndex(int index);
-
   /**
    * Notice: Call this method ONLY when using IOT_CONSENSUS, other consensus protocol cannot
    * distinguish whether the insertNode sync from leader by this method.
@@ -302,16 +244,8 @@ public abstract class InsertNode extends WritePlanNode {
   }
 
   // region partial insert
-  /**
-   * Mark failed measurement, measurements[index], dataTypes[index] and values/columns[index] would
-   * be null. We'd better use "measurements[index] == null" to determine if the measurement failed.
-   * <br>
-   * This method is not concurrency-safe.
-   *
-   * @param index failed measurement index
-   * @param cause cause Exception of failure
-   */
-  public void markFailedMeasurement(int index, Exception cause) {
+  @TestOnly
+  public void markFailedMeasurement(int index) {
     throw new UnsupportedOperationException();
   }
 
@@ -324,58 +258,12 @@ public abstract class InsertNode extends WritePlanNode {
     return false;
   }
 
-  public boolean hasFailedMeasurements() {
-    return failedMeasurementIndex2Info != null && !failedMeasurementIndex2Info.isEmpty();
+  public void setFailedMeasurementNumber(int failedMeasurementNumber) {
+    this.failedMeasurementNumber = failedMeasurementNumber;
   }
 
   public int getFailedMeasurementNumber() {
-    return failedMeasurementIndex2Info == null ? 0 : failedMeasurementIndex2Info.size();
-  }
-
-  public List<String> getFailedMeasurements() {
-    return failedMeasurementIndex2Info == null
-        ? Collections.emptyList()
-        : failedMeasurementIndex2Info.values().stream()
-            .map(info -> info.measurement)
-            .collect(Collectors.toList());
-  }
-
-  public List<Exception> getFailedExceptions() {
-    return failedMeasurementIndex2Info == null
-        ? Collections.emptyList()
-        : failedMeasurementIndex2Info.values().stream()
-            .map(info -> info.cause)
-            .collect(Collectors.toList());
-  }
-
-  public List<String> getFailedMessages() {
-    return failedMeasurementIndex2Info == null
-        ? Collections.emptyList()
-        : failedMeasurementIndex2Info.values().stream()
-            .map(
-                info -> {
-                  Throwable cause = info.cause;
-                  while (cause.getCause() != null) {
-                    cause = cause.getCause();
-                  }
-                  return cause.getMessage();
-                })
-            .collect(Collectors.toList());
-  }
-
-  protected static class FailedMeasurementInfo {
-    protected String measurement;
-    protected TSDataType dataType;
-    protected Object value;
-    protected Exception cause;
-
-    public FailedMeasurementInfo(
-        String measurement, TSDataType dataType, Object value, Exception cause) {
-      this.measurement = measurement;
-      this.dataType = dataType;
-      this.value = value;
-      this.cause = cause;
-    }
+    return failedMeasurementNumber;
   }
   // endregion
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
index e6f5a4099a..d7839f0997 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowNode.java
@@ -19,56 +19,38 @@
 package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
 
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.metadata.AlignedTimeseriesException;
-import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.exception.sql.SemanticException;
-import org.apache.iotdb.db.mpp.common.schematree.IMeasurementSchemaInfo;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
-import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
-import org.apache.iotdb.db.utils.CommonUtils;
 import org.apache.iotdb.db.utils.TimePartitionUtils;
 import org.apache.iotdb.db.utils.TypeInferenceUtils;
 import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.wal.buffer.WALEntryValue;
 import org.apache.iotdb.db.wal.utils.WALWriteUtils;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Objects;
 
-public class InsertRowNode extends InsertNode implements WALEntryValue, ISchemaValidation {
-
-  private static final Logger logger = LoggerFactory.getLogger(InsertRowNode.class);
+public class InsertRowNode extends InsertNode implements WALEntryValue {
 
   private static final byte TYPE_RAW_STRING = -1;
 
@@ -83,16 +65,34 @@ public class InsertRowNode extends InsertNode implements WALEntryValue, ISchemaV
     super(id);
   }
 
+  @TestOnly
+  public InsertRowNode(
+      PlanNodeId id,
+      PartialPath devicePath,
+      boolean isAligned,
+      String[] measurements,
+      TSDataType[] dataTypes,
+      long time,
+      Object[] values,
+      boolean isNeedInferType) {
+    super(id, devicePath, isAligned, measurements, dataTypes);
+    this.time = time;
+    this.values = values;
+    this.isNeedInferType = isNeedInferType;
+  }
+
   public InsertRowNode(
       PlanNodeId id,
       PartialPath devicePath,
       boolean isAligned,
       String[] measurements,
       TSDataType[] dataTypes,
+      MeasurementSchema[] measurementSchemas,
       long time,
       Object[] values,
       boolean isNeedInferType) {
     super(id, devicePath, isAligned, measurements, dataTypes);
+    this.measurementSchemas = measurementSchemas;
     this.time = time;
     this.values = values;
     this.isNeedInferType = isNeedInferType;
@@ -157,16 +157,6 @@ public class InsertRowNode extends InsertNode implements WALEntryValue, ISchemaV
     }
   }
 
-  @Override
-  public TSEncoding getEncoding(int index) {
-    return null;
-  }
-
-  @Override
-  public CompressionType getCompressionType(int index) {
-    return null;
-  }
-
   public Object[] getValues() {
     return values;
   }
@@ -197,81 +187,10 @@ public class InsertRowNode extends InsertNode implements WALEntryValue, ISchemaV
   }
 
   @Override
-  protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
-    if (CommonUtils.checkCanCastType(dataTypes[columnIndex], dataType)) {
-      logger.warn(
-          "Inserting to {}.{} : Cast from {} to {}",
-          devicePath,
-          measurements[columnIndex],
-          dataTypes[columnIndex],
-          dataType);
-      values[columnIndex] =
-          CommonUtils.castValue(dataTypes[columnIndex], dataType, values[columnIndex]);
-      dataTypes[columnIndex] = dataType;
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * transfer String[] values to specific data types when isNeedInferType is true. <br>
-   * Notice: measurementSchemas must be initialized before calling this method
-   */
-  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
-  public void transferType() throws QueryProcessException {
-
-    for (int i = 0; i < measurementSchemas.length; i++) {
-      // null when time series doesn't exist
-      if (measurementSchemas[i] == null) {
-        if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
-          throw new QueryProcessException(
-              new PathNotExistException(
-                  devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i]));
-        } else {
-          markFailedMeasurement(
-              i,
-              new QueryProcessException(
-                  new PathNotExistException(
-                      devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i])));
-        }
-        continue;
-      }
-      // parse string value to specific type
-      dataTypes[i] = measurementSchemas[i].getType();
-      try {
-        values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString());
-      } catch (Exception e) {
-        logger.warn(
-            "data type of {}.{} is not consistent, registered type {}, inserting timestamp {}, value {}",
-            devicePath,
-            measurements[i],
-            dataTypes[i],
-            time,
-            values[i]);
-        if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
-          throw e;
-        } else {
-          markFailedMeasurement(i, e);
-        }
-      }
-    }
-    isNeedInferType = false;
-  }
-
-  @Override
-  public void markFailedMeasurement(int index, Exception cause) {
+  public void markFailedMeasurement(int index) {
     if (measurements[index] == null) {
       return;
     }
-
-    if (failedMeasurementIndex2Info == null) {
-      failedMeasurementIndex2Info = new HashMap<>();
-    }
-
-    FailedMeasurementInfo failedMeasurementInfo =
-        new FailedMeasurementInfo(measurements[index], dataTypes[index], values[index], cause);
-    failedMeasurementIndex2Info.putIfAbsent(index, failedMeasurementInfo);
-
     measurements[index] = null;
     dataTypes[index] = null;
     values[index] = null;
@@ -527,11 +446,6 @@ public class InsertRowNode extends InsertNode implements WALEntryValue, ISchemaV
     return getTime();
   }
 
-  @Override
-  public Object getFirstValueOfIndex(int index) {
-    return values[index];
-  }
-
   // region serialize & deserialize methods for WAL
   /** Serialized size for wal */
   @Override
@@ -810,49 +724,4 @@ public class InsertRowNode extends InsertNode implements WALEntryValue, ISchemaV
     Object value = values[columnIndex];
     return new TimeValuePair(time, TsPrimitiveType.getByType(dataTypes[columnIndex], value));
   }
-
-  @Override
-  public void validateDeviceSchema(boolean isAligned) {
-    if (this.isAligned != isAligned) {
-      throw new SemanticException(
-          new AlignedTimeseriesException(
-              String.format(
-                  "timeseries under this device are%s aligned, " + "please use %s interface",
-                  isAligned ? "" : " not", isAligned ? "aligned" : "non-aligned"),
-              devicePath.getFullPath()));
-    }
-  }
-
-  @Override
-  public ISchemaValidation getSchemaValidation() {
-    return this;
-  }
-
-  @Override
-  public void updateAfterSchemaValidation() throws QueryProcessException {
-    if (isNeedInferType) {
-      transferType();
-    }
-  }
-
-  @Override
-  public void validateMeasurementSchema(int index, IMeasurementSchemaInfo measurementSchemaInfo) {
-    if (measurementSchemas == null) {
-      measurementSchemas = new MeasurementSchema[measurements.length];
-    }
-    if (measurementSchemaInfo == null) {
-      measurementSchemas[index] = null;
-    } else {
-      measurementSchemas[index] = measurementSchemaInfo.getSchemaAsMeasurementSchema();
-    }
-    if (isNeedInferType) {
-      return;
-    }
-
-    try {
-      selfCheckDataTypes(index);
-    } catch (DataTypeMismatchException | PathNotExistException e) {
-      throw new SemanticException(e);
-    }
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
index 5921c033f6..050839f642 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
@@ -22,9 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.utils.StatusUtils;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
-import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
@@ -32,7 +30,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.utils.TimePartitionUtils;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataOutputStream;
@@ -43,9 +40,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.stream.Collectors;
 
-public class InsertRowsNode extends InsertNode implements BatchInsertNode {
+public class InsertRowsNode extends InsertNode {
 
   /**
    * Suppose there is an InsertRowsNode, which contains 5 InsertRowNodes,
@@ -121,11 +117,6 @@ public class InsertRowsNode extends InsertNode implements BatchInsertNode {
   @Override
   public void addChild(PlanNode child) {}
 
-  @Override
-  protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
-    return false;
-  }
-
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
@@ -156,23 +147,6 @@ public class InsertRowsNode extends InsertNode implements BatchInsertNode {
     return null;
   }
 
-  @Override
-  public List<ISchemaValidation> getSchemaValidationList() {
-    return insertRowNodeList.stream()
-        .map(InsertRowNode::getSchemaValidation)
-        .collect(Collectors.toList());
-  }
-
-  @Override
-  public void updateAfterSchemaValidation() throws QueryProcessException {
-    for (InsertRowNode insertRowNode : insertRowNodeList) {
-      insertRowNode.updateAfterSchemaValidation();
-      if (!this.hasFailedMeasurements() && insertRowNode.hasFailedMeasurements()) {
-        this.failedMeasurementIndex2Info = insertRowNode.failedMeasurementIndex2Info;
-      }
-    }
-  }
-
   public static InsertRowsNode deserialize(ByteBuffer byteBuffer) {
     PlanNodeId planNodeId;
     List<InsertRowNode> insertRowNodeList = new ArrayList<>();
@@ -266,9 +240,4 @@ public class InsertRowsNode extends InsertNode implements BatchInsertNode {
   public long getMinTime() {
     throw new NotImplementedException();
   }
-
-  @Override
-  public Object getFirstValueOfIndex(int index) {
-    throw new NotImplementedException();
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 32b3e1c08d..dd788ea4db 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -23,9 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.StatusUtils;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
-import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
@@ -47,9 +45,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.stream.Collectors;
 
-public class InsertRowsOfOneDeviceNode extends InsertNode implements BatchInsertNode {
+public class InsertRowsOfOneDeviceNode extends InsertNode {
 
   /**
    * Suppose there is an InsertRowsOfOneDeviceNode, which contains 5 InsertRowNodes,
@@ -144,11 +141,6 @@ public class InsertRowsOfOneDeviceNode extends InsertNode implements BatchInsert
     return null;
   }
 
-  @Override
-  protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
-    return false;
-  }
-
   @Override
   public List<WritePlanNode> splitByPartition(Analysis analysis) {
     List<WritePlanNode> result = new ArrayList<>();
@@ -291,23 +283,6 @@ public class InsertRowsOfOneDeviceNode extends InsertNode implements BatchInsert
     return Objects.hash(super.hashCode(), insertRowNodeIndexList, insertRowNodeList);
   }
 
-  @Override
-  public List<ISchemaValidation> getSchemaValidationList() {
-    return insertRowNodeList.stream()
-        .map(InsertRowNode::getSchemaValidation)
-        .collect(Collectors.toList());
-  }
-
-  @Override
-  public void updateAfterSchemaValidation() throws QueryProcessException {
-    for (InsertRowNode insertRowNode : insertRowNodeList) {
-      insertRowNode.updateAfterSchemaValidation();
-      if (!this.hasFailedMeasurements() && insertRowNode.hasFailedMeasurements()) {
-        this.failedMeasurementIndex2Info = insertRowNode.failedMeasurementIndex2Info;
-      }
-    }
-  }
-
   @Override
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitInsertRowsOfOneDevice(this, context);
@@ -317,9 +292,4 @@ public class InsertRowsOfOneDeviceNode extends InsertNode implements BatchInsert
   public long getMinTime() {
     throw new NotImplementedException();
   }
-
-  @Override
-  public Object getFirstValueOfIndex(int index) {
-    throw new NotImplementedException();
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
index 09fa7c92b7..2501c535a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
@@ -23,19 +23,12 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.exception.metadata.AlignedTimeseriesException;
-import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
-import org.apache.iotdb.db.exception.sql.SemanticException;
-import org.apache.iotdb.db.mpp.common.schematree.IMeasurementSchemaInfo;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
-import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
-import org.apache.iotdb.db.utils.CommonUtils;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
 import org.apache.iotdb.db.utils.TimePartitionUtils;
 import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
@@ -43,9 +36,7 @@ import org.apache.iotdb.db.wal.buffer.WALEntryValue;
 import org.apache.iotdb.db.wal.utils.WALWriteUtils;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
@@ -54,9 +45,6 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -69,9 +57,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-public class InsertTabletNode extends InsertNode implements WALEntryValue, ISchemaValidation {
-
-  private static final Logger logger = LoggerFactory.getLogger(InsertTabletNode.class);
+public class InsertTabletNode extends InsertNode implements WALEntryValue {
 
   private static final String DATATYPE_UNSUPPORTED = "Data type %s is not supported.";
 
@@ -97,17 +83,37 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
     super(id);
   }
 
+  @TestOnly
+  public InsertTabletNode(
+      PlanNodeId id,
+      PartialPath devicePath,
+      boolean isAligned,
+      String[] measurements,
+      TSDataType[] dataTypes,
+      long[] times,
+      BitMap[] bitMaps,
+      Object[] columns,
+      int rowCount) {
+    super(id, devicePath, isAligned, measurements, dataTypes);
+    this.times = times;
+    this.bitMaps = bitMaps;
+    this.columns = columns;
+    this.rowCount = rowCount;
+  }
+
   public InsertTabletNode(
       PlanNodeId id,
       PartialPath devicePath,
       boolean isAligned,
       String[] measurements,
       TSDataType[] dataTypes,
+      MeasurementSchema[] measurementSchemas,
       long[] times,
       BitMap[] bitMaps,
       Object[] columns,
       int rowCount) {
     super(id, devicePath, isAligned, measurements, dataTypes);
+    this.measurementSchemas = measurementSchemas;
     this.times = times;
     this.bitMaps = bitMaps;
     this.columns = columns;
@@ -177,23 +183,6 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
     return null;
   }
 
-  @Override
-  protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
-    if (CommonUtils.checkCanCastType(dataTypes[columnIndex], dataType)) {
-      logger.warn(
-          "Inserting to {}.{} : Cast from {} to {}",
-          devicePath,
-          measurements[columnIndex],
-          dataTypes[columnIndex],
-          dataType);
-      columns[columnIndex] =
-          CommonUtils.castArray(dataTypes[columnIndex], dataType, columns[columnIndex]);
-      dataTypes[columnIndex] = dataType;
-      return true;
-    }
-    return false;
-  }
-
   @Override
   public List<WritePlanNode> splitByPartition(Analysis analysis) {
     // only single device in single database
@@ -287,6 +276,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
                 isAligned,
                 measurements,
                 dataTypes,
+                measurementSchemas,
                 subTimes,
                 bitMaps,
                 values,
@@ -357,19 +347,10 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
   }
 
   @Override
-  public void markFailedMeasurement(int index, Exception cause) {
+  public void markFailedMeasurement(int index) {
     if (measurements[index] == null) {
       return;
     }
-
-    if (failedMeasurementIndex2Info == null) {
-      failedMeasurementIndex2Info = new HashMap<>();
-    }
-
-    FailedMeasurementInfo failedMeasurementInfo =
-        new FailedMeasurementInfo(measurements[index], dataTypes[index], columns[index], cause);
-    failedMeasurementIndex2Info.putIfAbsent(index, failedMeasurementInfo);
-
     measurements[index] = null;
     dataTypes[index] = null;
     columns[index] = null;
@@ -380,41 +361,6 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
     return times[0];
   }
 
-  @Override
-  public Object getFirstValueOfIndex(int index) {
-    Object value;
-    switch (dataTypes[index]) {
-      case INT32:
-        int[] intValues = (int[]) columns[index];
-        value = intValues[0];
-        break;
-      case INT64:
-        long[] longValues = (long[]) columns[index];
-        value = longValues[0];
-        break;
-      case FLOAT:
-        float[] floatValues = (float[]) columns[index];
-        value = floatValues[0];
-        break;
-      case DOUBLE:
-        double[] doubleValues = (double[]) columns[index];
-        value = doubleValues[0];
-        break;
-      case BOOLEAN:
-        boolean[] boolValues = (boolean[]) columns[index];
-        value = boolValues[0];
-        break;
-      case TEXT:
-        Binary[] binaryValues = (Binary[]) columns[index];
-        value = binaryValues[0];
-        break;
-      default:
-        throw new UnSupportedDataTypeException(
-            String.format(DATATYPE_UNSUPPORTED, dataTypes[index]));
-    }
-    return value;
-  }
-
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.INSERT_TABLET.serialize(byteBuffer);
@@ -1122,49 +1068,4 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
     }
     return new TimeValuePair(times[lastIdx], value);
   }
-
-  @Override
-  public TSEncoding getEncoding(int index) {
-    return null;
-  }
-
-  @Override
-  public CompressionType getCompressionType(int index) {
-    return null;
-  }
-
-  @Override
-  public void validateDeviceSchema(boolean isAligned) {
-    if (this.isAligned != isAligned) {
-      throw new SemanticException(
-          new AlignedTimeseriesException(
-              String.format(
-                  "timeseries under this device are%s aligned, " + "please use %s interface",
-                  isAligned ? "" : " not", isAligned ? "aligned" : "non-aligned"),
-              devicePath.getFullPath()));
-    }
-  }
-
-  @Override
-  public ISchemaValidation getSchemaValidation() {
-    return this;
-  }
-
-  @Override
-  public void validateMeasurementSchema(int index, IMeasurementSchemaInfo measurementSchemaInfo) {
-    if (measurementSchemas == null) {
-      measurementSchemas = new MeasurementSchema[measurements.length];
-    }
-    if (measurementSchemaInfo == null) {
-      measurementSchemas[index] = null;
-    } else {
-      measurementSchemas[index] = measurementSchemaInfo.getSchemaAsMeasurementSchema();
-    }
-
-    try {
-      selfCheckDataTypes(index);
-    } catch (DataTypeMismatchException | PathNotExistException e) {
-      throw new SemanticException(e);
-    }
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index cb64acaf6b..8d91843fef 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -166,6 +166,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
   }
 
   private Future<FragInstanceDispatchResult> dispatchWriteAsync(List<FragmentInstance> instances) {
+    List<TSStatus> dataNodeFailureList = new ArrayList<>();
     // split local and remote instances
     List<FragmentInstance> localInstances = new ArrayList<>();
     List<FragmentInstance> remoteInstances = new ArrayList<>();
@@ -182,14 +183,12 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
         new AsyncPlanNodeSender(asyncInternalServiceClientManager, remoteInstances);
     asyncPlanNodeSender.sendAll();
 
-    List<TSStatus> dataNodeFailureList = new ArrayList<>();
-
     if (!localInstances.isEmpty()) {
       // sync dispatch to local
       long localScheduleStartTime = System.nanoTime();
       for (FragmentInstance localInstance : localInstances) {
         try (SetThreadName threadName = new SetThreadName(localInstance.getId().getFullId())) {
-          dispatchOneInstance(localInstance);
+          dispatchLocally(localInstance);
         } catch (FragmentInstanceDispatchException e) {
           dataNodeFailureList.add(e.getFailureStatus());
         } catch (Throwable t) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
index d2d6d55ea1..d6c9941c58 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
@@ -19,11 +19,19 @@
 package org.apache.iotdb.db.mpp.plan.statement.crud;
 
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 public abstract class InsertBaseStatement extends Statement {
 
@@ -35,10 +43,15 @@ public abstract class InsertBaseStatement extends Statement {
 
   protected boolean isAligned;
 
+  protected MeasurementSchema[] measurementSchemas;
+
   protected String[] measurements;
   // get from client
   protected TSDataType[] dataTypes;
 
+  /** index of failed measurements -> info including measurement, data type and value */
+  protected Map<Integer, FailedMeasurementInfo> failedMeasurementIndex2Info;
+
   public PartialPath getDevicePath() {
     return devicePath;
   }
@@ -55,12 +68,12 @@ public abstract class InsertBaseStatement extends Statement {
     this.measurements = measurements;
   }
 
-  public TSDataType[] getDataTypes() {
-    return dataTypes;
+  public MeasurementSchema[] getMeasurementSchemas() {
+    return measurementSchemas;
   }
 
-  public void setDataTypes(TSDataType[] dataTypes) {
-    this.dataTypes = dataTypes;
+  public void setMeasurementSchemas(MeasurementSchema[] measurementSchemas) {
+    this.measurementSchemas = measurementSchemas;
   }
 
   public boolean isAligned() {
@@ -71,6 +84,14 @@ public abstract class InsertBaseStatement extends Statement {
     isAligned = aligned;
   }
 
+  public TSDataType[] getDataTypes() {
+    return dataTypes;
+  }
+
+  public void setDataTypes(TSDataType[] dataTypes) {
+    this.dataTypes = dataTypes;
+  }
+
   /** Returns true when this statement is empty and no need to write into the server */
   public abstract boolean isEmpty();
 
@@ -78,4 +99,132 @@ public abstract class InsertBaseStatement extends Statement {
   public List<PartialPath> getPaths() {
     return Collections.emptyList();
   }
+
+  public abstract ISchemaValidation getSchemaValidation();
+
+  public abstract List<ISchemaValidation> getSchemaValidationList();
+
+  public void updateAfterSchemaValidation() throws QueryProcessException {}
+
+  /** Check whether data types are matched with measurement schemas */
+  protected void selfCheckDataTypes(int index)
+      throws DataTypeMismatchException, PathNotExistException {
+    if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+      // if enable partial insert, mark failed measurements with exception
+      if (measurementSchemas[index] == null) {
+        markFailedMeasurement(
+            index,
+            new PathNotExistException(devicePath.concatNode(measurements[index]).getFullPath()));
+      } else if ((dataTypes[index] != measurementSchemas[index].getType()
+          && !checkAndCastDataType(index, measurementSchemas[index].getType()))) {
+        markFailedMeasurement(
+            index,
+            new DataTypeMismatchException(
+                devicePath.getFullPath(),
+                measurements[index],
+                dataTypes[index],
+                measurementSchemas[index].getType(),
+                getMinTime(),
+                getFirstValueOfIndex(index)));
+      }
+    } else {
+      // if not enable partial insert, throw the exception directly
+      if (measurementSchemas[index] == null) {
+        throw new PathNotExistException(devicePath.concatNode(measurements[index]).getFullPath());
+      } else if ((dataTypes[index] != measurementSchemas[index].getType()
+          && !checkAndCastDataType(index, measurementSchemas[index].getType()))) {
+        throw new DataTypeMismatchException(
+            devicePath.getFullPath(),
+            measurements[index],
+            dataTypes[index],
+            measurementSchemas[index].getType(),
+            getMinTime(),
+            getFirstValueOfIndex(index));
+      }
+    }
+  }
+
+  protected abstract boolean checkAndCastDataType(int columnIndex, TSDataType dataType);
+
+  public abstract long getMinTime();
+
+  public abstract Object getFirstValueOfIndex(int index);
+
+  // region partial insert
+  /**
+   * Mark failed measurement, measurements[index], dataTypes[index] and values/columns[index] would
+   * be null. We'd better use "measurements[index] == null" to determine if the measurement failed.
+   * <br>
+   * This method is not concurrency-safe.
+   *
+   * @param index failed measurement index
+   * @param cause cause Exception of failure
+   */
+  public void markFailedMeasurement(int index, Exception cause) {
+    throw new UnsupportedOperationException();
+  }
+
+  public boolean hasValidMeasurements() {
+    for (Object o : measurements) {
+      if (o != null) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public boolean hasFailedMeasurements() {
+    return failedMeasurementIndex2Info != null && !failedMeasurementIndex2Info.isEmpty();
+  }
+
+  public int getFailedMeasurementNumber() {
+    return failedMeasurementIndex2Info == null ? 0 : failedMeasurementIndex2Info.size();
+  }
+
+  public List<String> getFailedMeasurements() {
+    return failedMeasurementIndex2Info == null
+        ? Collections.emptyList()
+        : failedMeasurementIndex2Info.values().stream()
+            .map(info -> info.measurement)
+            .collect(Collectors.toList());
+  }
+
+  public List<Exception> getFailedExceptions() {
+    return failedMeasurementIndex2Info == null
+        ? Collections.emptyList()
+        : failedMeasurementIndex2Info.values().stream()
+            .map(info -> info.cause)
+            .collect(Collectors.toList());
+  }
+
+  public List<String> getFailedMessages() {
+    return failedMeasurementIndex2Info == null
+        ? Collections.emptyList()
+        : failedMeasurementIndex2Info.values().stream()
+            .map(
+                info -> {
+                  Throwable cause = info.cause;
+                  while (cause.getCause() != null) {
+                    cause = cause.getCause();
+                  }
+                  return cause.getMessage();
+                })
+            .collect(Collectors.toList());
+  }
+
+  protected static class FailedMeasurementInfo {
+    protected String measurement;
+    protected TSDataType dataType;
+    protected Object value;
+    protected Exception cause;
+
+    public FailedMeasurementInfo(
+        String measurement, TSDataType dataType, Object value, Exception cause) {
+      this.measurement = measurement;
+      this.dataType = dataType;
+      this.value = value;
+      this.cause = cause;
+    }
+  }
+  // endregion
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java
index 77090231e6..2ee89dc69c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java
@@ -20,12 +20,15 @@
 package org.apache.iotdb.db.mpp.plan.statement.crud;
 
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
 import org.apache.iotdb.db.mpp.plan.statement.StatementType;
 import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class InsertMultiTabletsStatement extends InsertBaseStatement {
 
@@ -95,4 +98,31 @@ public class InsertMultiTabletsStatement extends InsertBaseStatement {
     }
     return result;
   }
+
+  @Override
+  public ISchemaValidation getSchemaValidation() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<ISchemaValidation> getSchemaValidationList() {
+    return insertTabletStatementList.stream()
+        .map(InsertTabletStatement::getSchemaValidation)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
+    return false;
+  }
+
+  @Override
+  public long getMinTime() {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Object getFirstValueOfIndex(int index) {
+    throw new NotImplementedException();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
index 7fc5659d90..094cfe0739 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
@@ -20,19 +20,38 @@
 package org.apache.iotdb.db.mpp.plan.statement.crud;
 
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.AlignedTimeseriesException;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.common.schematree.IMeasurementSchemaInfo;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
 import org.apache.iotdb.db.mpp.plan.statement.StatementType;
 import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.utils.CommonUtils;
 import org.apache.iotdb.db.utils.TimePartitionUtils;
+import org.apache.iotdb.db.utils.TypeInferenceUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 
-public class InsertRowStatement extends InsertBaseStatement {
+public class InsertRowStatement extends InsertBaseStatement implements ISchemaValidation {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(InsertRowStatement.class);
 
   private static final byte TYPE_RAW_STRING = -1;
   private static final byte TYPE_NULL = -2;
@@ -130,4 +149,166 @@ public class InsertRowStatement extends InsertBaseStatement {
   public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
     return visitor.visitInsertRow(this, context);
   }
+
+  @Override
+  public long getMinTime() {
+    return getTime();
+  }
+
+  @Override
+  public Object getFirstValueOfIndex(int index) {
+    return values[index];
+  }
+
+  @Override
+  protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
+    if (CommonUtils.checkCanCastType(dataTypes[columnIndex], dataType)) {
+      LOGGER.warn(
+          "Inserting to {}.{} : Cast from {} to {}",
+          devicePath,
+          measurements[columnIndex],
+          dataTypes[columnIndex],
+          dataType);
+      values[columnIndex] =
+          CommonUtils.castValue(dataTypes[columnIndex], dataType, values[columnIndex]);
+      dataTypes[columnIndex] = dataType;
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * transfer String[] values to specific data types when isNeedInferType is true. <br>
+   * Notice: measurementSchemas must be initialized before calling this method
+   */
+  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+  public void transferType() throws QueryProcessException {
+
+    for (int i = 0; i < measurementSchemas.length; i++) {
+      // null when time series doesn't exist
+      if (measurementSchemas[i] == null) {
+        if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+          throw new QueryProcessException(
+              new PathNotExistException(
+                  devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i]));
+        } else {
+          markFailedMeasurement(
+              i,
+              new QueryProcessException(
+                  new PathNotExistException(
+                      devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i])));
+        }
+        continue;
+      }
+      // parse string value to specific type
+      dataTypes[i] = measurementSchemas[i].getType();
+      try {
+        values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString());
+      } catch (Exception e) {
+        LOGGER.warn(
+            "data type of {}.{} is not consistent, "
+                + "registered type {}, inserting timestamp {}, value {}",
+            devicePath,
+            measurements[i],
+            dataTypes[i],
+            time,
+            values[i]);
+        if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+          throw e;
+        } else {
+          markFailedMeasurement(i, e);
+        }
+      }
+    }
+    isNeedInferType = false;
+  }
+
+  @Override
+  public void markFailedMeasurement(int index, Exception cause) {
+    if (measurements[index] == null) {
+      return;
+    }
+
+    if (failedMeasurementIndex2Info == null) {
+      failedMeasurementIndex2Info = new HashMap<>();
+    }
+
+    InsertBaseStatement.FailedMeasurementInfo failedMeasurementInfo =
+        new InsertBaseStatement.FailedMeasurementInfo(
+            measurements[index], dataTypes[index], values[index], cause);
+    failedMeasurementIndex2Info.putIfAbsent(index, failedMeasurementInfo);
+
+    measurements[index] = null;
+    dataTypes[index] = null;
+    values[index] = null;
+  }
+
+  @Override
+  public ISchemaValidation getSchemaValidation() {
+    return this;
+  }
+
+  @Override
+  public List<ISchemaValidation> getSchemaValidationList() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void updateAfterSchemaValidation() throws QueryProcessException {
+    if (isNeedInferType) {
+      transferType();
+    }
+  }
+
+  @Override
+  public TSDataType getDataType(int index) {
+    if (isNeedInferType) {
+      return TypeInferenceUtils.getPredictedDataType(values[index], true);
+    } else {
+      return dataTypes[index];
+    }
+  }
+
+  @Override
+  public TSEncoding getEncoding(int index) {
+    return null;
+  }
+
+  @Override
+  public CompressionType getCompressionType(int index) {
+    return null;
+  }
+
+  @Override
+  public void validateDeviceSchema(boolean isAligned) {
+    if (this.isAligned != isAligned) {
+      throw new SemanticException(
+          new AlignedTimeseriesException(
+              String.format(
+                  "timeseries under this device are%s aligned, " + "please use %s interface",
+                  isAligned ? "" : " not", isAligned ? "aligned" : "non-aligned"),
+              devicePath.getFullPath()));
+    }
+  }
+
+  @Override
+  public void validateMeasurementSchema(int index, IMeasurementSchemaInfo measurementSchemaInfo) {
+    if (measurementSchemas == null) {
+      measurementSchemas = new MeasurementSchema[measurements.length];
+    }
+    if (measurementSchemaInfo == null) {
+      measurementSchemas[index] = null;
+    } else {
+      measurementSchemas[index] = measurementSchemaInfo.getSchemaAsMeasurementSchema();
+    }
+    if (isNeedInferType) {
+      return;
+    }
+
+    try {
+      selfCheckDataTypes(index);
+    } catch (DataTypeMismatchException | PathNotExistException e) {
+      throw new SemanticException(e);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
index 1507dd112b..65c1f106a9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
@@ -21,14 +21,19 @@ package org.apache.iotdb.db.mpp.plan.statement.crud;
 
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
 import org.apache.iotdb.db.mpp.plan.statement.StatementType;
 import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.utils.TimePartitionUtils;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 public class InsertRowsOfOneDeviceStatement extends InsertBaseStatement {
 
@@ -94,4 +99,41 @@ public class InsertRowsOfOneDeviceStatement extends InsertBaseStatement {
     }
     return ret;
   }
+
+  @Override
+  public ISchemaValidation getSchemaValidation() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<ISchemaValidation> getSchemaValidationList() {
+    return insertRowStatementList.stream()
+        .map(InsertRowStatement::getSchemaValidation)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public void updateAfterSchemaValidation() throws QueryProcessException {
+    for (InsertRowStatement insertRowStatement : insertRowStatementList) {
+      insertRowStatement.updateAfterSchemaValidation();
+      if (!this.hasFailedMeasurements() && insertRowStatement.hasFailedMeasurements()) {
+        this.failedMeasurementIndex2Info = insertRowStatement.failedMeasurementIndex2Info;
+      }
+    }
+  }
+
+  @Override
+  protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
+    return false;
+  }
+
+  @Override
+  public long getMinTime() {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Object getFirstValueOfIndex(int index) {
+    throw new NotImplementedException();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java
index c2b24e6926..0f0f973b94 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java
@@ -20,12 +20,16 @@
 package org.apache.iotdb.db.mpp.plan.statement.crud;
 
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
 import org.apache.iotdb.db.mpp.plan.statement.StatementType;
 import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class InsertRowsStatement extends InsertBaseStatement {
 
@@ -95,4 +99,41 @@ public class InsertRowsStatement extends InsertBaseStatement {
     }
     return result;
   }
+
+  @Override
+  public ISchemaValidation getSchemaValidation() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<ISchemaValidation> getSchemaValidationList() {
+    return insertRowStatementList.stream()
+        .map(InsertRowStatement::getSchemaValidation)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public void updateAfterSchemaValidation() throws QueryProcessException {
+    for (InsertRowStatement insertRowStatement : insertRowStatementList) {
+      insertRowStatement.updateAfterSchemaValidation();
+      if (!this.hasFailedMeasurements() && insertRowStatement.hasFailedMeasurements()) {
+        this.failedMeasurementIndex2Info = insertRowStatement.failedMeasurementIndex2Info;
+      }
+    }
+  }
+
+  @Override
+  protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
+    return false;
+  }
+
+  @Override
+  public long getMinTime() {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Object getFirstValueOfIndex(int index) {
+    throw new NotImplementedException();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java
index 2da234041a..cfa864fbbd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java
@@ -20,15 +20,36 @@ package org.apache.iotdb.db.mpp.plan.statement.crud;
 
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.metadata.AlignedTimeseriesException;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.common.schematree.IMeasurementSchemaInfo;
+import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
 import org.apache.iotdb.db.mpp.plan.statement.StatementType;
 import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.utils.CommonUtils;
 import org.apache.iotdb.db.utils.TimePartitionUtils;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 
-public class InsertTabletStatement extends InsertBaseStatement {
+public class InsertTabletStatement extends InsertBaseStatement implements ISchemaValidation {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(InsertTabletStatement.class);
+
+  private static final String DATATYPE_UNSUPPORTED = "Data type %s is not supported.";
 
   private long[] times; // times should be sorted. It is done in the session API.
   private BitMap[] bitMaps;
@@ -117,4 +138,136 @@ public class InsertTabletStatement extends InsertBaseStatement {
     }
     return ret;
   }
+
+  @Override
+  public ISchemaValidation getSchemaValidation() {
+    return this;
+  }
+
+  @Override
+  public List<ISchemaValidation> getSchemaValidationList() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
+    if (CommonUtils.checkCanCastType(dataTypes[columnIndex], dataType)) {
+      LOGGER.warn(
+          "Inserting to {}.{} : Cast from {} to {}",
+          devicePath,
+          measurements[columnIndex],
+          dataTypes[columnIndex],
+          dataType);
+      columns[columnIndex] =
+          CommonUtils.castArray(dataTypes[columnIndex], dataType, columns[columnIndex]);
+      dataTypes[columnIndex] = dataType;
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public void markFailedMeasurement(int index, Exception cause) {
+    if (measurements[index] == null) {
+      return;
+    }
+
+    if (failedMeasurementIndex2Info == null) {
+      failedMeasurementIndex2Info = new HashMap<>();
+    }
+
+    InsertBaseStatement.FailedMeasurementInfo failedMeasurementInfo =
+        new InsertBaseStatement.FailedMeasurementInfo(
+            measurements[index], dataTypes[index], columns[index], cause);
+    failedMeasurementIndex2Info.putIfAbsent(index, failedMeasurementInfo);
+
+    measurements[index] = null;
+    dataTypes[index] = null;
+    columns[index] = null;
+  }
+
+  @Override
+  public long getMinTime() {
+    return times[0];
+  }
+
+  @Override
+  public Object getFirstValueOfIndex(int index) {
+    Object value;
+    switch (dataTypes[index]) {
+      case INT32:
+        int[] intValues = (int[]) columns[index];
+        value = intValues[0];
+        break;
+      case INT64:
+        long[] longValues = (long[]) columns[index];
+        value = longValues[0];
+        break;
+      case FLOAT:
+        float[] floatValues = (float[]) columns[index];
+        value = floatValues[0];
+        break;
+      case DOUBLE:
+        double[] doubleValues = (double[]) columns[index];
+        value = doubleValues[0];
+        break;
+      case BOOLEAN:
+        boolean[] boolValues = (boolean[]) columns[index];
+        value = boolValues[0];
+        break;
+      case TEXT:
+        Binary[] binaryValues = (Binary[]) columns[index];
+        value = binaryValues[0];
+        break;
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format(DATATYPE_UNSUPPORTED, dataTypes[index]));
+    }
+    return value;
+  }
+
+  @Override
+  public TSDataType getDataType(int index) {
+    return dataTypes[index];
+  }
+
+  @Override
+  public TSEncoding getEncoding(int index) {
+    return null;
+  }
+
+  @Override
+  public CompressionType getCompressionType(int index) {
+    return null;
+  }
+
+  @Override
+  public void validateDeviceSchema(boolean isAligned) {
+    if (this.isAligned != isAligned) {
+      throw new SemanticException(
+          new AlignedTimeseriesException(
+              String.format(
+                  "timeseries under this device are%s aligned, " + "please use %s interface",
+                  isAligned ? "" : " not", isAligned ? "aligned" : "non-aligned"),
+              devicePath.getFullPath()));
+    }
+  }
+
+  @Override
+  public void validateMeasurementSchema(int index, IMeasurementSchemaInfo measurementSchemaInfo) {
+    if (measurementSchemas == null) {
+      measurementSchemas = new MeasurementSchema[measurements.length];
+    }
+    if (measurementSchemaInfo == null) {
+      measurementSchemas[index] = null;
+    } else {
+      measurementSchemas[index] = measurementSchemaInfo.getSchemaAsMeasurementSchema();
+    }
+
+    try {
+      selfCheckDataTypes(index);
+    } catch (DataTypeMismatchException | PathNotExistException e) {
+      throw new SemanticException(e);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
index 89430bc91f..093aa30e2a 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoer.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.metadata.idtable.IDTable;
 import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
-import org.apache.iotdb.db.mpp.plan.analyze.schema.SchemaValidator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
@@ -109,9 +108,6 @@ public class TsFilePlanRedoer {
       // TODO get device id by idTable
       // idTable.getSeriesSchemas(node);
     } else {
-      if (!IoTDBDescriptor.getInstance().getConfig().isClusterMode()) {
-        SchemaValidator.validate(node);
-      }
       node.setDeviceID(DeviceIDFactory.getInstance().getDeviceID(node.getDevicePath()));
     }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
index 34b2655e6c..89406f8f58 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
@@ -282,11 +282,11 @@ public class DataRegionTest {
             false,
             measurements,
             dataTypes,
+            measurementSchemas,
             times,
             null,
             columns,
             times.length);
-    insertTabletNode1.setMeasurementSchemas(measurementSchemas);
 
     dataRegion.insertTablet(insertTabletNode1);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -304,11 +304,11 @@ public class DataRegionTest {
             false,
             measurements,
             dataTypes,
+            measurementSchemas,
             times,
             null,
             columns,
             times.length);
-    insertTabletNode2.setMeasurementSchemas(measurementSchemas);
 
     dataRegion.insertTablet(insertTabletNode2);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -445,11 +445,11 @@ public class DataRegionTest {
             false,
             measurements,
             dataTypes,
+            measurementSchemas,
             times,
             null,
             columns,
             times.length);
-    insertTabletNode1.setMeasurementSchemas(measurementSchemas);
 
     dataRegion.insertTablet(insertTabletNode1);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -466,11 +466,11 @@ public class DataRegionTest {
             false,
             measurements,
             dataTypes,
+            measurementSchemas,
             times,
             null,
             columns,
             times.length);
-    insertTabletNode2.setMeasurementSchemas(measurementSchemas);
 
     dataRegion.insertTablet(insertTabletNode2);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -533,11 +533,11 @@ public class DataRegionTest {
             false,
             measurements,
             dataTypes,
+            measurementSchemas,
             times,
             null,
             columns,
             times.length);
-    insertTabletNode1.setMeasurementSchemas(measurementSchemas);
 
     dataRegion.insertTablet(insertTabletNode1);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -554,11 +554,11 @@ public class DataRegionTest {
             false,
             measurements,
             dataTypes,
+            measurementSchemas,
             times,
             null,
             columns,
             times.length);
-    insertTabletNode2.setMeasurementSchemas(measurementSchemas);
 
     dataRegion.insertTablet(insertTabletNode2);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -621,11 +621,11 @@ public class DataRegionTest {
             false,
             measurements,
             dataTypes,
+            measurementSchemas,
             times,
             null,
             columns,
             times.length);
-    insertTabletNode1.setMeasurementSchemas(measurementSchemas);
 
     dataRegion.insertTablet(insertTabletNode1);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
@@ -642,11 +642,11 @@ public class DataRegionTest {
             false,
             measurements,
             dataTypes,
+            measurementSchemas,
             times,
             null,
             columns,
             times.length);
-    insertTabletNode2.setMeasurementSchemas(measurementSchemas);
 
     dataRegion.insertTablet(insertTabletNode2);
     dataRegion.asyncCloseAllWorkingTsFileProcessors();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
index 8cdfeeba81..7c03f926a1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/FakeSchemaFetcherImpl.java
@@ -64,7 +64,12 @@ public class FakeSchemaFetcherImpl implements ISchemaFetcher {
 
   @Override
   public void fetchAndComputeSchemaWithAutoCreate(
-      List<? extends ISchemaComputationWithAutoCreation> schemaComputationWithAutoCreationList) {}
+      List<? extends ISchemaComputationWithAutoCreation> schemaComputationWithAutoCreationList) {
+    for (ISchemaComputationWithAutoCreation computation : schemaComputationWithAutoCreationList) {
+      computation.computeMeasurement(
+          0, new SchemaMeasurementNode("s", new MeasurementSchema("s", TSDataType.INT32)));
+    }
+  }
 
   /**
    * Generate the following tree: root.sg.d1.s1, root.sg.d1.s2(status) root.sg.d2.s1,
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java b/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java
index bb696fdda8..1156021d7f 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/io/WALFileTest.java
@@ -236,18 +236,6 @@ public class WALFileTest {
       }
       bitMaps[i].mark(i % times.length);
     }
-
-    InsertTabletNode insertTabletNode =
-        new InsertTabletNode(
-            new PlanNodeId(""),
-            new PartialPath(devicePath),
-            false,
-            new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
-            dataTypes,
-            times,
-            bitMaps,
-            columns,
-            times.length);
     MeasurementSchema[] schemas =
         new MeasurementSchema[] {
           new MeasurementSchema("s1", dataTypes[0]),
@@ -257,9 +245,18 @@ public class WALFileTest {
           new MeasurementSchema("s5", dataTypes[4]),
           new MeasurementSchema("s6", dataTypes[5]),
         };
-    insertTabletNode.setMeasurementSchemas(schemas);
 
-    return insertTabletNode;
+    return new InsertTabletNode(
+        new PlanNodeId(""),
+        new PartialPath(devicePath),
+        false,
+        new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+        dataTypes,
+        schemas,
+        times,
+        bitMaps,
+        columns,
+        times.length);
   }
 
   public static DeleteDataNode getDeleteDataNode(String devicePath) throws IllegalPathException {
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
index 517b7aeaf1..508338a6e0 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
@@ -697,19 +697,12 @@ public class ConsensusReqReaderTest {
       bitMaps[i].mark(i % times.length);
     }
 
-    InsertTabletNode insertTabletNode =
-        new InsertTabletNode(
-            new PlanNodeId(""),
-            new PartialPath(devicePath),
-            false,
-            new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
-            dataTypes,
-            times,
-            bitMaps,
-            columns,
-            times.length);
-
-    insertTabletNode.setMeasurementSchemas(
+    return new InsertTabletNode(
+        new PlanNodeId(""),
+        new PartialPath(devicePath),
+        false,
+        new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+        dataTypes,
         new MeasurementSchema[] {
           new MeasurementSchema("s1", TSDataType.DOUBLE),
           new MeasurementSchema("s2", TSDataType.FLOAT),
@@ -717,9 +710,11 @@ public class ConsensusReqReaderTest {
           new MeasurementSchema("s4", TSDataType.INT32),
           new MeasurementSchema("s5", TSDataType.BOOLEAN),
           new MeasurementSchema("s6", TSDataType.TEXT)
-        });
-
-    return insertTabletNode;
+        },
+        times,
+        bitMaps,
+        columns,
+        times.length);
   }
 
   private DeleteDataNode getDeleteDataNode(String devicePath) throws IllegalPathException {
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java b/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java
index 770a3c7fb2..d2820fbde6 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java
@@ -195,24 +195,22 @@ public class WALNodeTest {
       }
       bitMaps[i].mark(i % times.length);
     }
-
-    InsertTabletNode insertTabletNode =
-        new InsertTabletNode(
-            new PlanNodeId(""),
-            new PartialPath(devicePath),
-            false,
-            measurements,
-            dataTypes,
-            times,
-            bitMaps,
-            columns,
-            times.length);
     MeasurementSchema[] schemas = new MeasurementSchema[6];
     for (int i = 0; i < 6; i++) {
       schemas[i] = new MeasurementSchema(measurements[i], dataTypes[i], TSEncoding.PLAIN);
     }
-    insertTabletNode.setMeasurementSchemas(schemas);
-    return insertTabletNode;
+
+    return new InsertTabletNode(
+        new PlanNodeId(""),
+        new PartialPath(devicePath),
+        false,
+        measurements,
+        dataTypes,
+        schemas,
+        times,
+        bitMaps,
+        columns,
+        times.length);
   }
 
   @Test
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoerTest.java b/server/src/test/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoerTest.java
index 91843aecf6..dffbf32181 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/recover/file/TsFilePlanRedoerTest.java
@@ -291,6 +291,7 @@ public class TsFilePlanRedoerTest {
             false,
             new String[] {"s1", "s2"},
             new TSDataType[] {TSDataType.INT32, TSDataType.INT64},
+            null,
             times,
             bitMaps,
             columns,
@@ -392,6 +393,7 @@ public class TsFilePlanRedoerTest {
               TSDataType.FLOAT,
               TSDataType.TEXT
             },
+            null,
             times,
             bitMaps,
             columns,
@@ -473,6 +475,10 @@ public class TsFilePlanRedoerTest {
             false,
             new String[] {"s1", "s2"},
             new TSDataType[] {TSDataType.INT32, TSDataType.INT64},
+            new MeasurementSchema[] {
+              new MeasurementSchema("s1", TSDataType.INT32),
+              new MeasurementSchema("s2", TSDataType.INT64),
+            },
             times,
             null,
             columns,
@@ -520,15 +526,14 @@ public class TsFilePlanRedoerTest {
             false,
             new String[] {"s1", "s2"},
             new TSDataType[] {TSDataType.INT32, TSDataType.INT64},
+            new MeasurementSchema[] {
+              new MeasurementSchema("s1", TSDataType.INT32),
+              new MeasurementSchema("s2", TSDataType.INT64),
+            },
             times,
             null,
             columns,
             times.length);
-    insertTabletNode.setMeasurementSchemas(
-        new MeasurementSchema[] {
-          new MeasurementSchema("s1", TSDataType.INT32),
-          new MeasurementSchema("s2", TSDataType.INT64),
-        });
 
     // redo InsertTabletPlan, vsg processor is used to test IdTable, don't test IdTable here
     TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, false, null);
@@ -634,6 +639,14 @@ public class TsFilePlanRedoerTest {
       // mark value of time=9 as null
       bitMaps[i].mark(3);
     }
+    MeasurementSchema[] schemas =
+        new MeasurementSchema[] {
+          null,
+          new MeasurementSchema("s2", TSDataType.INT64),
+          new MeasurementSchema("s3", TSDataType.BOOLEAN),
+          new MeasurementSchema("s4", TSDataType.FLOAT),
+          null
+        };
     InsertTabletNode insertTabletNode =
         new InsertTabletNode(
             new PlanNodeId(""),
@@ -647,20 +660,13 @@ public class TsFilePlanRedoerTest {
               TSDataType.FLOAT,
               TSDataType.TEXT
             },
+            schemas,
             times,
             bitMaps,
             columns,
             times.length);
     // redo InsertTabletPlan, data region is used to test IdTable, don't test IdTable here
     TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, true, null);
-    MeasurementSchema[] schemas =
-        new MeasurementSchema[] {
-          null,
-          new MeasurementSchema("s2", TSDataType.INT64),
-          new MeasurementSchema("s3", TSDataType.BOOLEAN),
-          new MeasurementSchema("s4", TSDataType.FLOAT),
-          null
-        };
     insertTabletNode.setMeasurementSchemas(schemas);
     planRedoer.redoInsert(insertTabletNode);
 
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformerTest.java b/server/src/test/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformerTest.java
index d1fe54b54b..95875118cf 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformerTest.java
@@ -286,7 +286,7 @@ public class UnsealedTsFileRecoverPerformerTest {
             time,
             new Integer[] {1},
             false);
-    insertRowNode.markFailedMeasurement(0, new Exception());
+    insertRowNode.markFailedMeasurement(0);
 
     // generate InsertTabletNode with null
     time = 5;
@@ -297,11 +297,12 @@ public class UnsealedTsFileRecoverPerformerTest {
             false,
             new String[] {"s1"},
             new TSDataType[] {TSDataType.INT64},
+            null,
             new long[] {time},
             null,
             new Integer[] {1},
             1);
-    insertTabletNode.markFailedMeasurement(0, new Exception());
+    insertTabletNode.markFailedMeasurement(0);
 
     int fakeMemTableId = 1;
     WALEntry walEntry1 = new WALInfoEntry(fakeMemTableId++, insertRowNode);