You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/02 11:45:39 UTC

[iotdb] branch master updated: [IOTDB-2748] Writing statement and writing process of coordinator (#5355)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ee6e770  [IOTDB-2748] Writing statement and writing process of coordinator (#5355)
ee6e770 is described below

commit ee6e7700cffe287a5c42affe45c69bee3e2e1518
Author: Mrquan <50...@users.noreply.github.com>
AuthorDate: Sat Apr 2 19:44:55 2022 +0800

    [IOTDB-2748] Writing statement and writing process of coordinator (#5355)
---
 .../iotdb/commons/partition/DataPartition.java     |  20 +-
 .../commons/partition/DataPartitionQueryParam.java |   4 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  11 ++
 .../db/mpp/common/schematree/PathPatternTree.java  |  13 ++
 .../iotdb/db/mpp/common/schematree/SchemaTree.java |   6 +
 .../apache/iotdb/db/mpp/sql/analyze/Analysis.java  |  16 +-
 .../apache/iotdb/db/mpp/sql/analyze/Analyzer.java  |  72 ++++++--
 .../db/mpp/sql/analyze/ClusterSchemaFetcher.java   |   8 +
 .../db/mpp/sql/analyze/FakeSchemaFetcherImpl.java  |   8 +
 .../iotdb/db/mpp/sql/analyze/ISchemaFetcher.java   |   5 +
 .../mpp/sql/analyze/StandaloneSchemaFetcher.java   |   8 +
 .../db/mpp/sql/parser/StatementGenerator.java      |  52 ++++--
 .../iotdb/db/mpp/sql/planner/LogicalPlanner.java   |  43 ++++-
 .../sql/planner/plan/node/write/InsertNode.java    |  69 ++++++-
 .../sql/planner/plan/node/write/InsertRowNode.java |  48 ++++-
 .../planner/plan/node/write/InsertTabletNode.java  | 198 +++++++++++++++++++-
 .../db/mpp/sql/statement/StatementVisitor.java     |  10 +-
 .../sql/statement/crud/InsertBaseStatement.java    |  59 ++++++
 .../mpp/sql/statement/crud/InsertRowStatement.java | 203 +++++++++++++++++++++
 .../sql/statement/crud/InsertTabletStatement.java  |  96 ++++++++++
 .../iotdb/db/query/control/SessionManager.java     |  15 ++
 .../db/service/thrift/impl/TSServiceImpl.java      |  57 ++++--
 22 files changed, 944 insertions(+), 77 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 7af4aea..3203d27 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -18,9 +18,7 @@
  */
 package org.apache.iotdb.commons.partition;
 
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.stream.Collectors;
 
 public class DataPartition {
@@ -50,6 +48,22 @@ public class DataPartition {
         .collect(Collectors.toList());
   }
 
+  public List<RegionReplicaSet> getDataRegionReplicaSetForWriting(
+      String deviceName, List<TimePartitionSlot> timePartitionIdList) {
+    // A list of data region replica sets will store data in a same time partition.
+    // We will insert data to the last set in the list.
+    // TODO return the latest dataRegionReplicaSet for each time partition
+    return Collections.emptyList();
+  }
+
+  public RegionReplicaSet getDataRegionReplicaSetForWriting(
+      String deviceName, TimePartitionSlot timePartitionIdList) {
+    // A list of data region replica sets will store data in a same time partition.
+    // We will insert data to the last set in the list.
+    // TODO return the latest dataRegionReplicaSet for each time partition
+    return null;
+  }
+
   private SeriesPartitionSlot calculateDeviceGroupId(String deviceName) {
     // TODO: (xingtanzjr) implement the real algorithm for calculation of DeviceGroupId
     return new SeriesPartitionSlot(deviceName.length());
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
index 8d8ac29..d8bc05e 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
@@ -33,11 +33,11 @@ public class DataPartitionQueryParam {
     this.devicePath = devicePath;
   }
 
-  public List<TimePartitionSlot> getTimePartitionIdList() {
+  public List<TimePartitionSlot> getTimePartitionSlotList() {
     return timePartitionSlotList;
   }
 
-  public void setTimePartitionIdList(List<TimePartitionSlot> timePartitionSlotList) {
+  public void setTimePartitionSlotList(List<TimePartitionSlot> timePartitionSlotList) {
     this.timePartitionSlotList = timePartitionSlotList;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 1e41abb..7c92e42 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.exception.ShutdownException;
+import org.apache.iotdb.commons.partition.TimePartitionSlot;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.utils.TestOnly;
@@ -179,6 +180,16 @@ public class StorageEngine implements IService {
     return enablePartition ? time / timePartitionInterval : 0;
   }
 
+  public static TimePartitionSlot getTimePartitionSlot(long time) {
+    TimePartitionSlot timePartitionSlot = new TimePartitionSlot();
+    if (enablePartition) {
+      timePartitionSlot.setStartTime(time - time % timePartitionInterval);
+    } else {
+      timePartitionSlot.setStartTime(0);
+    }
+    return timePartitionSlot;
+  }
+
   public static boolean isEnablePartition() {
     return enablePartition;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java
index 6146efa..1b07f67 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/PathPatternTree.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 public class PathPatternTree {
@@ -42,6 +43,18 @@ public class PathPatternTree {
    */
   protected boolean isPrefixMatchPath;
 
+  public PathPatternTree(PartialPath deivcePath, String[] measurements) {
+    // TODO
+    this.root = new PathPatternNode(SQLConstant.ROOT);
+    this.pathList = new ArrayList<>();
+  };
+
+  public PathPatternTree(Map<PartialPath, List<String>> devices) {
+    // TODO
+    this.root = new PathPatternNode(SQLConstant.ROOT);
+    this.pathList = new ArrayList<>();
+  };
+
   public PathPatternTree() {
     this.root = new PathPatternNode(SQLConstant.ROOT);
     this.pathList = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
index 248602a..5543d3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -47,6 +48,11 @@ public class SchemaTree {
     return new Pair<>(visitor.getAllResult(), visitor.getNextOffset());
   }
 
+  public List<MeasurementSchema> searchMeasurementSchema(
+      PartialPath devicePath, List<String> measurements) {
+    return new ArrayList<>();
+  }
+
   public void serialize(ByteBuffer buffer) throws IOException {
     // TODO
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
index d184b82..84f49b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
@@ -82,14 +82,6 @@ public class Analysis {
     this.schemaPartition = schemaPartition;
   }
 
-  public SchemaTree getSchemaTree() {
-    return schemaTree;
-  }
-
-  public void setSchemaTree(SchemaTree schemaTree) {
-    this.schemaTree = schemaTree;
-  }
-
   public Map<String, Set<PartialPath>> getDeviceIdToPathsMap() {
     return deviceIdToPathsMap;
   }
@@ -97,4 +89,12 @@ public class Analysis {
   public void setDeviceIdToPathsMap(Map<String, Set<PartialPath>> deviceIdToPathsMap) {
     this.deviceIdToPathsMap = deviceIdToPathsMap;
   }
+
+  public SchemaTree getSchemaTree() {
+    return schemaTree;
+  }
+
+  public void setSchemaTree(SchemaTree schemaTree) {
+    this.schemaTree = schemaTree;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
index 500676b..ed4da79 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
@@ -22,7 +22,9 @@ package org.apache.iotdb.db.mpp.sql.analyze;
 import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
 import org.apache.iotdb.commons.partition.PartitionInfo;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.PathNumOverLimitException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.SQLParserException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
@@ -39,6 +41,7 @@ import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
 import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
 import org.apache.iotdb.db.mpp.sql.statement.component.WhereCondition;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.mpp.sql.statement.crud.InsertStatement;
 import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.mpp.sql.statement.crud.QueryStatement;
@@ -46,13 +49,7 @@ import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateAlignedTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 /** Analyze the statement and generate Analysis. */
 public class Analyzer {
@@ -196,27 +193,68 @@ public class Analyzer {
     @Override
     public Analysis visitInsertTablet(
         InsertTabletStatement insertTabletStatement, MPPQueryContext context) {
-      // TODO(INSERT) device + time range -> PartitionInfo
       DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
+      dataPartitionQueryParam.setTimePartitionSlotList(
+          insertTabletStatement.getTimePartitionSlots());
       dataPartitionQueryParam.setDevicePath(insertTabletStatement.getDevicePath().getFullPath());
-      // TODO(INSERT) calculate the time partition id list
-      //      dataPartitionQueryParam.setTimePartitionIdList();
       PartitionInfo partitionInfo = partitionFetcher.fetchPartitionInfo(dataPartitionQueryParam);
 
-      // TODO(INSERT) get each time series schema according to SchemaPartitionInfo in PartitionInfo
-      PathPatternTree patternTree = new PathPatternTree();
-      patternTree.appendPaths(
-          insertTabletStatement.getDevicePath(),
-          Arrays.asList(insertTabletStatement.getMeasurements()));
-      SchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree);
+      SchemaTree schemaTree =
+          IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()
+              ? schemaFetcher.fetchSchemaWithAutoCreate(
+                  insertTabletStatement.getDevicePath(),
+                  insertTabletStatement.getMeasurements(),
+                  insertTabletStatement.getDataTypes())
+              : schemaFetcher.fetchSchema(
+                  new PathPatternTree(
+                      insertTabletStatement.getDevicePath(),
+                      insertTabletStatement.getMeasurements()));
 
       Analysis analysis = new Analysis();
       analysis.setSchemaTree(schemaTree);
-      // TODO(INSERT) do type check here
+
+      if (!insertTabletStatement.checkDataType(schemaTree)) {
+        throw new SemanticException("Data type mismatch");
+      }
       analysis.setStatement(insertTabletStatement);
       analysis.setDataPartitionInfo(partitionInfo.getDataPartitionInfo());
       analysis.setSchemaPartitionInfo(partitionInfo.getSchemaPartitionInfo());
       return analysis;
     }
+
+    public Analysis visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryContext context) {
+      DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
+      dataPartitionQueryParam.setDevicePath(insertRowStatement.getDevicePath().getFullPath());
+      dataPartitionQueryParam.setTimePartitionSlotList(insertRowStatement.getTimePartitionSlots());
+      PartitionInfo partitionInfo = partitionFetcher.fetchPartitionInfo(dataPartitionQueryParam);
+
+      SchemaTree schemaTree =
+          IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()
+              ? schemaFetcher.fetchSchemaWithAutoCreate(
+                  insertRowStatement.getDevicePath(),
+                  insertRowStatement.getMeasurements(),
+                  insertRowStatement.getDataTypes())
+              : schemaFetcher.fetchSchema(
+                  new PathPatternTree(
+                      insertRowStatement.getDevicePath(), insertRowStatement.getMeasurements()));
+
+      Analysis analysis = new Analysis();
+      analysis.setSchemaTree(schemaTree);
+
+      try {
+        insertRowStatement.transferType(schemaTree);
+      } catch (QueryProcessException e) {
+        throw new SemanticException(e.getMessage());
+      }
+
+      if (!insertRowStatement.checkDataType(schemaTree)) {
+        throw new SemanticException("Data type mismatch");
+      }
+
+      analysis.setStatement(insertRowStatement);
+      analysis.setDataPartitionInfo(partitionInfo.getDataPartitionInfo());
+      analysis.setSchemaPartitionInfo(partitionInfo.getSchemaPartitionInfo());
+      return analysis;
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
index 0320229..2af452c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
@@ -18,8 +18,10 @@
  */
 package org.apache.iotdb.db.mpp.sql.analyze;
 
+import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 public class ClusterSchemaFetcher implements ISchemaFetcher {
 
@@ -27,4 +29,10 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
   public SchemaTree fetchSchema(PathPatternTree patternTree) {
     return null;
   }
+
+  @Override
+  public SchemaTree fetchSchemaWithAutoCreate(
+      PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes) {
+    return null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java
index 81e0945..3b803ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakeSchemaFetcherImpl.java
@@ -19,12 +19,20 @@
 
 package org.apache.iotdb.db.mpp.sql.analyze;
 
+import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 public class FakeSchemaFetcherImpl implements ISchemaFetcher {
   @Override
   public SchemaTree fetchSchema(PathPatternTree patternTree) {
     return new SchemaTree();
   }
+
+  @Override
+  public SchemaTree fetchSchemaWithAutoCreate(
+      PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes) {
+    return null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ISchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ISchemaFetcher.java
index 31eaa68..f7b5d34 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ISchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ISchemaFetcher.java
@@ -19,8 +19,10 @@
 
 package org.apache.iotdb.db.mpp.sql.analyze;
 
+import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 /**
  * This interface is used to fetch the metadata information required in execution plan generating.
@@ -28,4 +30,7 @@ import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 public interface ISchemaFetcher {
 
   SchemaTree fetchSchema(PathPatternTree patternTree);
+
+  SchemaTree fetchSchemaWithAutoCreate(
+      PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
index 767217f..f6f10e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
@@ -18,8 +18,10 @@
  */
 package org.apache.iotdb.db.mpp.sql.analyze;
 
+import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 public class StandaloneSchemaFetcher implements ISchemaFetcher {
 
@@ -33,4 +35,10 @@ public class StandaloneSchemaFetcher implements ISchemaFetcher {
   public SchemaTree fetchSchema(PathPatternTree patternTree) {
     return null;
   }
+
+  @Override
+  public SchemaTree fetchSchemaWithAutoCreate(
+      PartialPath devicePath, String[] measurements, TSDataType[] tsDataTypes) {
+    return null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/StatementGenerator.java
index 5ffb513..1bbd2b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/StatementGenerator.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.sql.parser;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.filter.BasicFunctionFilter;
 import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
@@ -30,18 +31,14 @@ import org.apache.iotdb.db.mpp.sql.statement.component.FromComponent;
 import org.apache.iotdb.db.mpp.sql.statement.component.ResultColumn;
 import org.apache.iotdb.db.mpp.sql.statement.component.SelectComponent;
 import org.apache.iotdb.db.mpp.sql.statement.component.WhereCondition;
-import org.apache.iotdb.db.mpp.sql.statement.crud.InsertStatement;
-import org.apache.iotdb.db.mpp.sql.statement.crud.LastQueryStatement;
-import org.apache.iotdb.db.mpp.sql.statement.crud.QueryStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.*;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
 import org.apache.iotdb.db.qp.sql.IoTDBSqlParser;
 import org.apache.iotdb.db.qp.sql.SqlLexer;
 import org.apache.iotdb.db.qp.strategy.SQLParseError;
 import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
-import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
-import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
-import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
+import org.apache.iotdb.db.utils.QueryDataSetUtils;
+import org.apache.iotdb.service.rpc.thrift.*;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -145,16 +142,43 @@ public class StatementGenerator {
   }
 
   public static Statement createStatement(TSInsertRecordReq insertRecordReq)
-      throws IllegalPathException {
+      throws IllegalPathException, QueryProcessException {
     // construct insert statement
-    InsertStatement insertStatement = new InsertStatement();
-    insertStatement.setDevice(new PartialPath(insertRecordReq.getPrefixPath()));
-    insertStatement.setTimes(new long[] {insertRecordReq.getTimestamp()});
+    InsertRowStatement insertStatement = new InsertRowStatement();
+    insertStatement.setDevicePath(new PartialPath(insertRecordReq.getPrefixPath()));
+    insertStatement.setTime(insertRecordReq.getTimestamp());
 
-    // TODO: set values after unifying SQL and RPC requests
     // insertStatement.setValuesList(insertRecordReq.getValues());
-    insertStatement.setMeasurementList(insertRecordReq.getMeasurements().toArray(new String[0]));
-    insertStatement.setAligned(insertStatement.isAligned());
+    insertStatement.fillValues(insertRecordReq.values);
+    insertStatement.setMeasurements(insertRecordReq.getMeasurements().toArray(new String[0]));
+    insertStatement.setAligned(insertRecordReq.isAligned);
+    return insertStatement;
+  }
+
+  public static Statement createStatement(TSInsertTabletReq insertTabletReq)
+      throws IllegalPathException {
+    // construct insert statement
+    InsertTabletStatement insertStatement = new InsertTabletStatement();
+    insertStatement.setDevicePath(new PartialPath(insertTabletReq.getPrefixPath()));
+    insertStatement.setMeasurements(insertTabletReq.getMeasurements().toArray(new String[0]));
+    insertStatement.setTimes(
+        QueryDataSetUtils.readTimesFromBuffer(insertTabletReq.timestamps, insertTabletReq.size));
+    insertStatement.setColumns(
+        QueryDataSetUtils.readValuesFromBuffer(
+            insertTabletReq.values,
+            insertTabletReq.types,
+            insertTabletReq.types.size(),
+            insertTabletReq.size));
+    insertStatement.setBitMaps(
+        QueryDataSetUtils.readBitMapsFromBuffer(
+            insertTabletReq.values, insertTabletReq.types.size(), insertTabletReq.size));
+    insertStatement.setRowCount(insertTabletReq.size);
+    TSDataType[] dataTypes = new TSDataType[insertTabletReq.types.size()];
+    for (int i = 0; i < insertTabletReq.types.size(); i++) {
+      dataTypes[i] = TSDataType.values()[insertTabletReq.types.get(i)];
+    }
+    insertStatement.setDataTypes(dataTypes);
+    insertStatement.setAligned(insertTabletReq.isAligned);
     return insertStatement;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
index 379a90f..a753f45 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
@@ -30,17 +30,20 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAligne
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.*;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
 import org.apache.iotdb.db.mpp.sql.statement.component.*;
 import org.apache.iotdb.db.mpp.sql.statement.crud.AggregationQueryStatement;
 import org.apache.iotdb.db.mpp.sql.statement.crud.FillQueryStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.mpp.sql.statement.crud.QueryStatement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateAlignedTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
 import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.util.*;
 import java.util.stream.Collectors;
@@ -278,10 +281,44 @@ public class LogicalPlanner {
     @Override
     public PlanNode visitInsertTablet(
         InsertTabletStatement insertTabletStatement, MPPQueryContext context) {
-      // TODO(INSERT) change the InsertTabletStatement to InsertTabletNode
-      InsertTabletNode node = new InsertTabletNode(PlanNodeIdAllocator.generateId());
+      // set schema in insert node
+      // convert insert statement to insert node
+      List<MeasurementSchema> measurementSchemas =
+          analysis
+              .getSchemaTree()
+              .searchMeasurementSchema(
+                  insertTabletStatement.getDevicePath(),
+                  Arrays.asList(insertTabletStatement.getMeasurements()));
+      return new InsertTabletNode(
+          PlanNodeIdAllocator.generateId(),
+          insertTabletStatement.getDevicePath(),
+          insertTabletStatement.isAligned(),
+          measurementSchemas.toArray(new MeasurementSchema[0]),
+          insertTabletStatement.getDataTypes(),
+          insertTabletStatement.getTimes(),
+          insertTabletStatement.getBitMaps(),
+          insertTabletStatement.getColumns(),
+          insertTabletStatement.getRowCount());
+    }
 
-      return node;
+    @Override
+    public PlanNode visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryContext context) {
+      // set schema in insert node
+      // convert insert statement to insert node
+      List<MeasurementSchema> measurementSchemas =
+          analysis
+              .getSchemaTree()
+              .searchMeasurementSchema(
+                  insertRowStatement.getDevicePath(),
+                  Arrays.asList(insertRowStatement.getMeasurements()));
+      return new InsertRowNode(
+          PlanNodeIdAllocator.generateId(),
+          insertRowStatement.getDevicePath(),
+          insertRowStatement.isAligned(),
+          measurementSchemas.toArray(new MeasurementSchema[0]),
+          insertRowStatement.getDataTypes(),
+          insertRowStatement.getTime(),
+          insertRowStatement.getValues());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
index 2f79538..96ccd48 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
 
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
@@ -49,18 +50,78 @@ public abstract class InsertNode extends PlanNode {
    */
   protected IDeviceID deviceID;
 
+  /** Physical address of data region after splitting */
+  RegionReplicaSet dataRegionReplicaSet;
+
   protected InsertNode(PlanNodeId id) {
     super(id);
   }
 
+  protected InsertNode(
+      PlanNodeId id,
+      PartialPath devicePath,
+      boolean isAligned,
+      MeasurementSchema[] measurements,
+      TSDataType[] dataTypes) {
+    super(id);
+    this.devicePath = devicePath;
+    this.isAligned = isAligned;
+    this.measurements = measurements;
+    this.dataTypes = dataTypes;
+  }
+
+  public RegionReplicaSet getDataRegionReplicaSet() {
+    return dataRegionReplicaSet;
+  }
+
+  public void setDataRegionReplicaSet(RegionReplicaSet dataRegionReplicaSet) {
+    this.dataRegionReplicaSet = dataRegionReplicaSet;
+  }
+
+  public PartialPath getDevicePath() {
+    return devicePath;
+  }
+
+  public void setDevicePath(PartialPath devicePath) {
+    this.devicePath = devicePath;
+  }
+
+  public boolean isAligned() {
+    return isAligned;
+  }
+
+  public void setAligned(boolean aligned) {
+    isAligned = aligned;
+  }
+
+  public MeasurementSchema[] getMeasurements() {
+    return measurements;
+  }
+
+  public void setMeasurements(MeasurementSchema[] measurements) {
+    this.measurements = measurements;
+  }
+
+  public TSDataType[] getDataTypes() {
+    return dataTypes;
+  }
+
+  public void setDataTypes(TSDataType[] dataTypes) {
+    this.dataTypes = dataTypes;
+  }
+
+  public IDeviceID getDeviceID() {
+    return deviceID;
+  }
+
+  public void setDeviceID(IDeviceID deviceID) {
+    this.deviceID = deviceID;
+  }
+
   // TODO(INSERT) split this insert node into multiple InsertNode according to the data partition
   // info
   public abstract List<InsertNode> splitByPartition(Analysis analysis);
 
-  public boolean needSplit() {
-    return true;
-  }
-
   @Override
   public void serialize(ByteBuffer byteBuffer) {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
index e881ff1..72fe657 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
@@ -18,23 +18,50 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
 
+import org.apache.iotdb.commons.partition.TimePartitionSlot;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.List;
 
 public class InsertRowNode extends InsertNode {
 
-  protected InsertRowNode(PlanNodeId id) {
+  private long time;
+  private Object[] values;
+
+  public InsertRowNode(PlanNodeId id) {
     super(id);
   }
 
+  public InsertRowNode(
+      PlanNodeId id,
+      PartialPath devicePath,
+      boolean isAligned,
+      MeasurementSchema[] measurements,
+      TSDataType[] dataTypes,
+      long time,
+      Object[] values) {
+    super(id, devicePath, isAligned, measurements, dataTypes);
+    this.time = time;
+    this.values = values;
+  }
+
   @Override
   public List<InsertNode> splitByPartition(Analysis analysis) {
-    return null;
+    TimePartitionSlot timePartitionSlot = StorageEngine.getTimePartitionSlot(time);
+    this.dataRegionReplicaSet =
+        analysis
+            .getDataPartitionInfo()
+            .getDataRegionReplicaSetForWriting(devicePath.getFullPath(), timePartitionSlot);
+    return Collections.singletonList(this);
   }
 
   @Override
@@ -67,8 +94,19 @@ public class InsertRowNode extends InsertNode {
   @Override
   public void serialize(ByteBuffer byteBuffer) {}
 
-  @Override
-  public boolean needSplit() {
-    return false;
+  public Object[] getValues() {
+    return values;
+  }
+
+  public void setValues(Object[] values) {
+    this.values = values;
+  }
+
+  public long getTime() {
+    return time;
+  }
+
+  public void setTime(long time) {
+    this.time = time;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
index f1e4744..eab6460 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
@@ -18,14 +18,21 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
 
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.commons.partition.TimePartitionSlot;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.nio.ByteBuffer;
-import java.util.List;
+import java.util.*;
 
 public class InsertTabletNode extends InsertNode {
 
@@ -34,8 +41,8 @@ public class InsertTabletNode extends InsertNode {
   private BitMap[] bitMaps;
   private Object[] columns;
 
-  private int start;
-  private int end;
+  private int rowCount = 0;
+
   // when this plan is sub-plan split from another InsertTabletPlan, this indicates the original
   // positions of values in
   // this plan. For example, if the plan contains 5 timestamps, and range = [1,4,10,12], then it
@@ -51,6 +58,63 @@ public class InsertTabletNode extends InsertNode {
     super(id);
   }
 
+  public InsertTabletNode(
+      PlanNodeId id,
+      PartialPath devicePath,
+      boolean isAligned,
+      MeasurementSchema[] measurements,
+      TSDataType[] dataTypes,
+      long[] times,
+      BitMap[] bitMaps,
+      Object[] columns,
+      int rowCount) {
+    super(id, devicePath, isAligned, measurements, dataTypes);
+    this.times = times;
+    this.bitMaps = bitMaps;
+    this.columns = columns;
+    this.rowCount = rowCount;
+  }
+
+  public long[] getTimes() {
+    return times;
+  }
+
+  public void setTimes(long[] times) {
+    this.times = times;
+  }
+
+  public BitMap[] getBitMaps() {
+    return bitMaps;
+  }
+
+  public void setBitMaps(BitMap[] bitMaps) {
+    this.bitMaps = bitMaps;
+  }
+
+  public Object[] getColumns() {
+    return columns;
+  }
+
+  public void setColumns(Object[] columns) {
+    this.columns = columns;
+  }
+
+  public int getRowCount() {
+    return rowCount;
+  }
+
+  public void setRowCount(int rowCount) {
+    this.rowCount = rowCount;
+  }
+
+  public List<Integer> getRange() {
+    return range;
+  }
+
+  public void setRange(List<Integer> range) {
+    this.range = range;
+  }
+
   @Override
   public List<PlanNode> getChildren() {
     return null;
@@ -83,6 +147,132 @@ public class InsertTabletNode extends InsertNode {
 
   @Override
   public List<InsertNode> splitByPartition(Analysis analysis) {
-    return null;
+    // only single device in single storage group
+    List<InsertNode> result = new ArrayList<>();
+    if (times.length == 0) {
+      return Collections.emptyList();
+    }
+    long startTime =
+        (times[0] / StorageEngine.getTimePartitionInterval())
+            * StorageEngine.getTimePartitionInterval(); // included
+    long endTime = startTime + StorageEngine.getTimePartitionInterval(); // excluded
+    TimePartitionSlot timePartitionSlot = StorageEngine.getTimePartitionSlot(times[0]);
+    int startLoc = 0; // included
+
+    List<TimePartitionSlot> timePartitionSlots = new ArrayList<>();
+    // for each List in split, they are range1.start, range1.end, range2.start, range2.end, ...
+    List<Integer> ranges = new ArrayList<>();
+    for (int i = 1; i < times.length; i++) { // times are sorted in session API.
+      if (times[i] >= endTime) {
+        // a new range.
+        ranges.add(startLoc); // included
+        ranges.add(i); // excluded
+        timePartitionSlots.add(timePartitionSlot);
+        // next init
+        startLoc = i;
+        startTime = endTime;
+        endTime =
+            (times[i] / StorageEngine.getTimePartitionInterval() + 1)
+                * StorageEngine.getTimePartitionInterval();
+        timePartitionSlot = StorageEngine.getTimePartitionSlot(times[i]);
+      }
+    }
+
+    // the final range
+    ranges.add(startLoc); // included
+    ranges.add(times.length); // excluded
+    timePartitionSlots.add(timePartitionSlot);
+
+    // data region for each time partition
+    List<RegionReplicaSet> dataRegionReplicaSets =
+        analysis
+            .getDataPartitionInfo()
+            .getDataRegionReplicaSetForWriting(devicePath.getFullPath(), timePartitionSlots);
+
+    Map<RegionReplicaSet, List<Integer>> splitMap = new HashMap<>();
+    for (int i = 0; i < dataRegionReplicaSets.size(); i++) {
+      List<Integer> sub_ranges =
+          splitMap.computeIfAbsent(dataRegionReplicaSets.get(i), x -> new ArrayList<>());
+      sub_ranges.add(ranges.get(i));
+      sub_ranges.add(ranges.get(i + 1));
+    }
+
+    List<Integer> locs;
+    for (Map.Entry<RegionReplicaSet, List<Integer>> entry : splitMap.entrySet()) {
+      // generate a new times and values
+      locs = entry.getValue();
+      int count = 0;
+      for (int i = 0; i < locs.size(); i += 2) {
+        int start = locs.get(i);
+        int end = locs.get(i + 1);
+        count += end - start;
+      }
+      long[] subTimes = new long[count];
+      int destLoc = 0;
+      Object[] values = initTabletValues(dataTypes.length, count, dataTypes);
+      BitMap[] bitMaps = this.bitMaps == null ? null : initBitmaps(dataTypes.length, count);
+      for (int i = 0; i < locs.size(); i += 2) {
+        int start = locs.get(i);
+        int end = locs.get(i + 1);
+        System.arraycopy(times, start, subTimes, destLoc, end - start);
+        for (int k = 0; k < values.length; k++) {
+          System.arraycopy(columns[k], start, values[k], destLoc, end - start);
+          if (bitMaps != null && this.bitMaps[k] != null) {
+            BitMap.copyOfRange(this.bitMaps[k], start, bitMaps[k], destLoc, end - start);
+          }
+        }
+        destLoc += end - start;
+      }
+      InsertTabletNode subNode =
+          new InsertTabletNode(
+              getId(),
+              devicePath,
+              isAligned,
+              measurements,
+              dataTypes,
+              subTimes,
+              bitMaps,
+              values,
+              subTimes.length);
+      subNode.setRange(locs);
+      subNode.setDataRegionReplicaSet(entry.getKey());
+      result.add(subNode);
+    }
+    return result;
+  }
+
+  private Object[] initTabletValues(int columnSize, int rowSize, TSDataType[] dataTypes) {
+    Object[] values = new Object[columnSize];
+    for (int i = 0; i < values.length; i++) {
+      switch (dataTypes[i]) {
+        case TEXT:
+          values[i] = new Binary[rowSize];
+          break;
+        case FLOAT:
+          values[i] = new float[rowSize];
+          break;
+        case INT32:
+          values[i] = new int[rowSize];
+          break;
+        case INT64:
+          values[i] = new long[rowSize];
+          break;
+        case DOUBLE:
+          values[i] = new double[rowSize];
+          break;
+        case BOOLEAN:
+          values[i] = new boolean[rowSize];
+          break;
+      }
+    }
+    return values;
+  }
+
+  private BitMap[] initBitmaps(int columnSize, int rowSize) {
+    BitMap[] bitMaps = new BitMap[columnSize];
+    for (int i = 0; i < columnSize; i++) {
+      bitMaps[i] = new BitMap(rowSize);
+    }
+    return bitMaps;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
index 8aa8331..3fd959c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/StatementVisitor.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.sql.statement;
 
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.mpp.sql.statement.crud.InsertStatement;
 import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.mpp.sql.statement.crud.QueryStatement;
@@ -27,8 +28,9 @@ import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateAlignedTimeSeriesSta
 import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
 
 /**
- * This class provides a visitor of {@link StatementNode}, which can be extended to create a visitor
- * which only needs to handle a subset of the available methods.
+ * This class provides a visitor of {@link org.apache.iotdb.db.mpp.sql.statement.StatementNode},
+ * which can be extended to create a visitor which only needs to handle a subset of the available
+ * methods.
  *
  * @param <R> The return type of the visit operation.
  * @param <C> The context information during visiting.
@@ -85,4 +87,8 @@ public abstract class StatementVisitor<R, C> {
   public R visitInsertTablet(InsertTabletStatement insertTabletStatement, C context) {
     return visitStatement(insertTabletStatement, context);
   }
+
+  public R visitInsertRow(InsertRowStatement insertRowStatement, C context) {
+    return visitStatement(insertRowStatement, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertBaseStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertBaseStatement.java
index 8b70ccc..271a70d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertBaseStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertBaseStatement.java
@@ -18,10 +18,15 @@
  */
 package org.apache.iotdb.db.mpp.sql.statement.crud;
 
+import org.apache.iotdb.commons.partition.TimePartitionSlot;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
+import java.util.ArrayList;
+import java.util.List;
+
 public abstract class InsertBaseStatement extends Statement {
 
   /**
@@ -31,15 +36,69 @@ public abstract class InsertBaseStatement extends Statement {
   protected PartialPath devicePath;
 
   protected boolean isAligned;
+
   protected String[] measurements;
   // get from client
   protected TSDataType[] dataTypes;
 
+  // record the failed measurements, their reasons, and positions in "measurements"
+  List<String> failedMeasurements;
+
   public PartialPath getDevicePath() {
     return devicePath;
   }
 
+  public void setDevicePath(PartialPath devicePath) {
+    this.devicePath = devicePath;
+  }
+
   public String[] getMeasurements() {
     return measurements;
   }
+
+  public void setMeasurements(String[] measurements) {
+    this.measurements = measurements;
+  }
+
+  public TSDataType[] getDataTypes() {
+    return dataTypes;
+  }
+
+  public void setDataTypes(TSDataType[] dataTypes) {
+    this.dataTypes = dataTypes;
+  }
+
+  public boolean isAligned() {
+    return isAligned;
+  }
+
+  public void setAligned(boolean aligned) {
+    isAligned = aligned;
+  }
+
+  public List<String> getFailedMeasurements() {
+    return failedMeasurements;
+  }
+
+  public abstract List<TimePartitionSlot> getTimePartitionSlots();
+
+  public abstract boolean checkDataType(SchemaTree schemaTree);
+
+  /**
+   * This method is overrided in InsertRowPlan and InsertTabletPlan. After marking failed
+   * measurements, the failed values or columns would be null as well. We'd better use
+   * "measurements[index] == null" to determine if the measurement failed.
+   *
+   * @param index failed measurement index
+   */
+  public void markFailedMeasurementInsertion(int index, Exception e) {
+    if (measurements[index] == null) {
+      return;
+    }
+    if (failedMeasurements == null) {
+      failedMeasurements = new ArrayList<>();
+    }
+    failedMeasurements.add(measurements[index]);
+    measurements[index] = null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowStatement.java
new file mode 100644
index 0000000..1c6f3e3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertRowStatement.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.statement.crud;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.partition.TimePartitionSlot;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
+import org.apache.iotdb.db.mpp.sql.constant.StatementType;
+import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
+import org.apache.iotdb.db.utils.CommonUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class InsertRowStatement extends InsertBaseStatement {
+  private static final Logger logger = LoggerFactory.getLogger(InsertRowStatement.class);
+
+  private static final byte TYPE_RAW_STRING = -1;
+  private static final byte TYPE_NULL = -2;
+
+  private long time;
+  private Object[] values;
+  private boolean isNeedInferType = false;
+
+  public InsertRowStatement() {
+    super();
+    statementType = StatementType.INSERT;
+  }
+
+  public long getTime() {
+    return time;
+  }
+
+  public void setTime(long time) {
+    this.time = time;
+  }
+
+  public Object[] getValues() {
+    return values;
+  }
+
+  public void setValues(Object[] values) {
+    this.values = values;
+  }
+
+  public boolean isNeedInferType() {
+    return isNeedInferType;
+  }
+
+  public void setNeedInferType(boolean needInferType) {
+    isNeedInferType = needInferType;
+  }
+
+  public void fillValues(ByteBuffer buffer) throws QueryProcessException {
+    this.values = new Object[measurements.length];
+    for (int i = 0; i < dataTypes.length; i++) {
+      // types are not determined, the situation mainly occurs when the plan uses string values
+      // and is forwarded to other nodes
+      byte typeNum = (byte) ReadWriteIOUtils.read(buffer);
+      if (typeNum == TYPE_RAW_STRING || typeNum == TYPE_NULL) {
+        values[i] = typeNum == TYPE_RAW_STRING ? ReadWriteIOUtils.readString(buffer) : null;
+        continue;
+      }
+      dataTypes[i] = TSDataType.values()[typeNum];
+      switch (dataTypes[i]) {
+        case BOOLEAN:
+          values[i] = ReadWriteIOUtils.readBool(buffer);
+          break;
+        case INT32:
+          values[i] = ReadWriteIOUtils.readInt(buffer);
+          break;
+        case INT64:
+          values[i] = ReadWriteIOUtils.readLong(buffer);
+          break;
+        case FLOAT:
+          values[i] = ReadWriteIOUtils.readFloat(buffer);
+          break;
+        case DOUBLE:
+          values[i] = ReadWriteIOUtils.readDouble(buffer);
+          break;
+        case TEXT:
+          values[i] = ReadWriteIOUtils.readBinary(buffer);
+          break;
+        default:
+          throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
+      }
+    }
+  }
+
+  /**
+   * if inferType is true, transfer String[] values to specific data types (Integer, Long, Float,
+   * Double, Binary)
+   */
+  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+  public void transferType(SchemaTree schemaTree) throws QueryProcessException {
+    List<MeasurementSchema> measurementSchemas =
+        schemaTree.searchMeasurementSchema(devicePath, Arrays.asList(measurements));
+    if (isNeedInferType) {
+      for (int i = 0; i < measurementSchemas.size(); i++) {
+        if (measurementSchemas.get(i) == null) {
+          if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+            markFailedMeasurementInsertion(
+                i,
+                new QueryProcessException(
+                    new PathNotExistException(
+                        devicePath.getFullPath()
+                            + IoTDBConstant.PATH_SEPARATOR
+                            + measurements[i])));
+          } else {
+            throw new QueryProcessException(
+                new PathNotExistException(
+                    devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i]));
+          }
+          continue;
+        }
+
+        dataTypes[i] = measurementSchemas.get(i).getType();
+        try {
+          values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString());
+        } catch (Exception e) {
+          logger.warn(
+              "{}.{} data type is not consistent, input {}, registered {}",
+              devicePath,
+              measurements[i],
+              values[i],
+              dataTypes[i]);
+          if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+            markFailedMeasurementInsertion(i, e);
+          } else {
+            throw e;
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void markFailedMeasurementInsertion(int index, Exception e) {
+    if (measurements[index] == null) {
+      return;
+    }
+    super.markFailedMeasurementInsertion(index, e);
+    values[index] = null;
+    dataTypes[index] = null;
+  }
+
+  @Override
+  public List<TimePartitionSlot> getTimePartitionSlots() {
+    return Collections.singletonList(StorageEngine.getTimePartitionSlot(time));
+  }
+
+  public boolean checkDataType(SchemaTree schemaTree) {
+    List<MeasurementSchema> measurementSchemas =
+        schemaTree.searchMeasurementSchema(devicePath, Arrays.asList(measurements));
+    for (int i = 0; i < measurementSchemas.size(); i++) {
+      if (dataTypes[i] != measurementSchemas.get(i).getType()) {
+        if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+          return false;
+        } else {
+          markFailedMeasurementInsertion(
+              i,
+              new DataTypeMismatchException(
+                  measurements[i], measurementSchemas.get(i).getType(), dataTypes[i]));
+        }
+      }
+    }
+    return true;
+  }
+
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitInsertRow(this, context);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java
index 9a97d66..b6d3a46 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java
@@ -18,11 +18,107 @@
  */
 package org.apache.iotdb.db.mpp.sql.statement.crud;
 
+import org.apache.iotdb.commons.partition.TimePartitionSlot;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
 public class InsertTabletStatement extends InsertBaseStatement {
 
   private long[] times; // times should be sorted. It is done in the session API.
   private BitMap[] bitMaps;
   private Object[] columns;
+
+  private int rowCount = 0;
+
+  public int getRowCount() {
+    return rowCount;
+  }
+
+  public void setRowCount(int rowCount) {
+    this.rowCount = rowCount;
+  }
+
+  public Object[] getColumns() {
+    return columns;
+  }
+
+  public void setColumns(Object[] columns) {
+    this.columns = columns;
+  }
+
+  public BitMap[] getBitMaps() {
+    return bitMaps;
+  }
+
+  public void setBitMaps(BitMap[] bitMaps) {
+    this.bitMaps = bitMaps;
+  }
+
+  public long[] getTimes() {
+    return times;
+  }
+
+  public void setTimes(long[] times) {
+    this.times = times;
+  }
+
+  @Override
+  public void markFailedMeasurementInsertion(int index, Exception e) {
+    if (measurements[index] == null) {
+      return;
+    }
+    super.markFailedMeasurementInsertion(index, e);
+    dataTypes[index] = null;
+    times[index] = Long.MAX_VALUE;
+    columns[index] = null;
+  }
+
+  @Override
+  public List<TimePartitionSlot> getTimePartitionSlots() {
+    List<TimePartitionSlot> result = new ArrayList<>();
+    long startTime =
+        (times[0] / StorageEngine.getTimePartitionInterval())
+            * StorageEngine.getTimePartitionInterval(); // included
+    long endTime = startTime + StorageEngine.getTimePartitionInterval(); // excluded
+    TimePartitionSlot timePartitionSlot = StorageEngine.getTimePartitionSlot(times[0]);
+    for (int i = 1; i < times.length; i++) { // times are sorted in session API.
+      if (times[i] >= endTime) {
+        result.add(timePartitionSlot);
+        // next init
+        endTime =
+            (times[i] / StorageEngine.getTimePartitionInterval() + 1)
+                * StorageEngine.getTimePartitionInterval();
+        timePartitionSlot = StorageEngine.getTimePartitionSlot(times[i]);
+      }
+    }
+    result.add(timePartitionSlot);
+    return result;
+  }
+
+  @Override
+  public boolean checkDataType(SchemaTree schemaTree) {
+    List<MeasurementSchema> measurementSchemas =
+        schemaTree.searchMeasurementSchema(devicePath, Arrays.asList(measurements));
+    for (int i = 0; i < measurementSchemas.size(); i++) {
+      if (dataTypes[i] != measurementSchemas.get(i).getType()) {
+        if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+          return false;
+        } else {
+          markFailedMeasurementInsertion(
+              i,
+              new DataTypeMismatchException(
+                  measurements[i], measurementSchemas.get(i).getType(), dataTypes[i]));
+        }
+      }
+    }
+    return true;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index 3815a89..3d7a8f9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -162,6 +162,21 @@ public class SessionManager {
     return SessionTimeoutManager.getInstance().unregister(sessionId);
   }
 
+  /**
+   * Check whether current user has logged in.
+   *
+   * @return true: If logged in; false: If not logged in
+   */
+  public boolean checkLogin(long sessionId) {
+    boolean isLoggedIn = getUsername(sessionId) != null;
+    if (!isLoggedIn) {
+      LOGGER.info("{}: Not login. ", IoTDBConstant.GLOBAL_DB_NAME);
+    } else {
+      SessionTimeoutManager.getInstance().refresh(sessionId);
+    }
+    return isLoggedIn;
+  }
+
   public long requestSessionId(
       String username, String zoneId, IoTDBConstant.ClientVersion clientVersion) {
     long sessionId = sessionIdGenerator.incrementAndGet();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index b4cbd67..3156206 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -39,6 +39,8 @@ import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.Coordinator;
 import org.apache.iotdb.db.mpp.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -1519,6 +1521,43 @@ public class TSServiceImpl implements TSIService.Iface {
     }
   }
 
+  public TSStatus insertRecordV2(TSInsertRecordReq req) {
+    long t1 = System.currentTimeMillis();
+    try {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      AUDIT_LOGGER.debug(
+          "Session {} insertRecord, device {}, time {}",
+          SESSION_MANAGER.getCurrSessionId(),
+          req.getPrefixPath(),
+          req.getTimestamp());
+
+      InsertRowStatement statement = (InsertRowStatement) StatementGenerator.createStatement(req);
+
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          coordinator.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              QueryType.WRITE,
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "");
+
+      // TODO(INSERT) do this check in analyze
+      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+      // req.getSessionId());
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } finally {
+      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+    }
+  }
+
   @Override
   public TSStatus insertStringRecord(TSInsertStringRecordReq req) {
     try {
@@ -1611,25 +1650,13 @@ public class TSServiceImpl implements TSIService.Iface {
   public TSStatus insertTabletV2(TSInsertTabletReq req) {
     long t1 = System.currentTimeMillis();
     try {
-      // TODO (xingtanzjr) move this method to SESSION_MANAGER
-      if (!serviceProvider.checkLogin(req.getSessionId())) {
+      if (!SESSION_MANAGER.checkLogin(req.getSessionId())) {
         return getNotLoggedInStatus();
       }
 
       // Step 1: TODO(INSERT) transfer from TSInsertTabletReq to Statement
-      InsertTabletStatement statement = new InsertTabletStatement();
-      //      InsertTabletPlan insertTabletPlan =
-      //          new InsertTabletPlan(new PartialPath(req.getPrefixPath()), req.measurements);
-      //      insertTabletPlan.setTimes(QueryDataSetUtils.readTimesFromBuffer(req.timestamps,
-      // req.size));
-      //      insertTabletPlan.setColumns(
-      //          QueryDataSetUtils.readValuesFromBuffer(
-      //              req.values, req.types, req.types.size(), req.size));
-      //      insertTabletPlan.setBitMaps(
-      //          QueryDataSetUtils.readBitMapsFromBuffer(req.values, req.types.size(), req.size));
-      //      insertTabletPlan.setRowCount(req.size);
-      //      insertTabletPlan.setDataTypes(req.types);
-      //      insertTabletPlan.setAligned(req.isAligned);
+      InsertTabletStatement statement =
+          (InsertTabletStatement) StatementGenerator.createStatement(req);
 
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId(false);