You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ch...@apache.org on 2021/02/23 11:44:30 UTC

[iotdb] branch master updated: [IOTDB-1163]optimize the insertRecords session interface for cluster version (#2698)

This is an automated email from the ASF dual-hosted git repository.

chaow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b4544d8  [IOTDB-1163]optimize the insertRecords session interface for cluster version (#2698)
b4544d8 is described below

commit b4544d8fc55cd8fa2005e14b5cfe797ba376d294
Author: HouliangQi <ne...@163.com>
AuthorDate: Tue Feb 23 19:44:07 2021 +0800

    [IOTDB-1163]optimize the insertRecords session interface for cluster version (#2698)
---
 .../iotdb/cluster/coordinator/Coordinator.java     |  33 +++-
 .../cluster/log/applier/AsyncDataLogApplier.java   |   8 +
 .../iotdb/cluster/log/applier/DataLogApplier.java  |  11 ++
 .../apache/iotdb/cluster/metadata/CMManager.java   |  34 +++-
 .../iotdb/cluster/query/ClusterPlanRouter.java     |  35 +++-
 .../apache/iotdb/db/qp/executor/IPlanExecutor.java |   8 +
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  27 +++
 .../org/apache/iotdb/db/qp/logical/Operator.java   |   3 +-
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |   8 +-
 .../iotdb/db/qp/physical/crud/InsertRowPlan.java   |  10 +-
 .../physical/crud/InsertRowsOfOneDevicePlan.java   |   4 +-
 .../iotdb/db/qp/physical/crud/InsertRowsPlan.java  | 219 +++++++++++++++++++++
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  87 +++++---
 13 files changed, 441 insertions(+), 46 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index 0ad14f4..f319dc6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
@@ -306,8 +307,10 @@ public class Coordinator {
     } else {
       if (plan instanceof InsertTabletPlan
           || plan instanceof InsertMultiTabletPlan
-          || plan instanceof CreateMultiTimeSeriesPlan) {
-        // InsertTabletPlan, InsertMultiTabletPlan and CreateMultiTimeSeriesPlan contains many rows,
+          || plan instanceof CreateMultiTimeSeriesPlan
+          || plan instanceof InsertRowsPlan) {
+        // InsertTabletPlan, InsertMultiTabletPlan, InsertRowsPlan and CreateMultiTimeSeriesPlan
+        // contains many rows,
         // each will correspond to a TSStatus as its execution result,
         // as the plan is split and the sub-plans may have interleaving ranges,
         // we must assure that each TSStatus is placed to the right position
@@ -455,8 +458,11 @@ public class Coordinator {
           // and the second dimension is the number of rows per InsertTabletPlan
           totalRowNum = ((InsertMultiTabletPlan) parentPlan).getTabletsSize();
         } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
-          totalRowNum = ((CreateMultiTimeSeriesPlan) parentPlan).getIndexes().size();
+          totalRowNum = ((CreateMultiTimeSeriesPlan) parentPlan).getPaths().size();
+        } else if (parentPlan instanceof InsertRowsPlan) {
+          totalRowNum = ((InsertRowsPlan) parentPlan).getRowCount();
         }
+
         if (subStatus == null) {
           subStatus = new TSStatus[totalRowNum];
           Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
@@ -492,8 +498,14 @@ public class Coordinator {
           for (int i = 0; i < subPlan.getIndexes().size(); i++) {
             subStatus[subPlan.getIndexes().get(i)] = tmpStatus.subStatus.get(i);
           }
+        } else if (parentPlan instanceof InsertRowsPlan) {
+          InsertRowsPlan subPlan = (InsertRowsPlan) entry.getKey();
+          for (int i = 0; i < subPlan.getInsertRowPlanIndexList().size(); i++) {
+            subStatus[subPlan.getInsertRowPlanIndexList().get(i)] = tmpStatus.subStatus.get(i);
+          }
         }
       }
+
       if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         // execution failed, record the error message
         errorCodePartitionGroups.add(
@@ -549,6 +561,21 @@ public class Coordinator {
         subStatus[integerTSStatusEntry.getKey()] = integerTSStatusEntry.getValue();
       }
     }
+
+    if (parentPlan instanceof InsertRowsPlan
+        && !((InsertRowsPlan) parentPlan).getResults().isEmpty()) {
+      if (subStatus == null) {
+        subStatus = new TSStatus[totalRowNum];
+        Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
+      }
+      noFailure = false;
+      isBatchFailure = true;
+      for (Map.Entry<Integer, TSStatus> integerTSStatusEntry :
+          ((InsertRowsPlan) parentPlan).getResults().entrySet()) {
+        subStatus[integerTSStatusEntry.getKey()] = integerTSStatusEntry.getValue();
+      }
+    }
+
     return concludeFinalStatus(
         noFailure, endPoint, isBatchFailure, subStatus, errorCodePartitionGroups);
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
index 8c4c2eb..65a5b2d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.db.service.IoTDB;
 
@@ -142,6 +143,10 @@ public class AsyncDataLogApplier implements LogApplier {
    * same. this is done in {@link
    * org.apache.iotdb.cluster.query.ClusterPlanRouter#splitAndRoutePlan(InsertMultiTabletPlan)}
    *
+   * <p>We can also sure that the storage group of all InsertRowPlans in InsertRowsPlan are the
+   * same. this is done in {@link
+   * org.apache.iotdb.cluster.query.ClusterPlanRouter#splitAndRoutePlan(InsertRowsPlan)}
+   *
    * @return the sg that the plan belongs to
    * @throws StorageGroupNotSetException if no sg found
    */
@@ -150,6 +155,9 @@ public class AsyncDataLogApplier implements LogApplier {
     if (plan instanceof InsertMultiTabletPlan) {
       PartialPath deviceId = ((InsertMultiTabletPlan) plan).getFirstDeviceId();
       sgPath = IoTDB.metaManager.getStorageGroupPath(deviceId);
+    } else if (plan instanceof InsertRowsPlan) {
+      PartialPath path = ((InsertRowsPlan) plan).getFirstDeviceId();
+      sgPath = IoTDB.metaManager.getStorageGroupPath(path);
     } else if (plan instanceof InsertPlan) {
       PartialPath deviceId = ((InsertPlan) plan).getDeviceId();
       sgPath = IoTDB.metaManager.getStorageGroupPath(deviceId);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index 5eecc7d..7d11d3e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -37,6 +37,8 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.service.IoTDB;
 
@@ -67,6 +69,8 @@ public class DataLogApplier extends BaseApplier {
         PhysicalPlan plan = physicalPlanLog.getPlan();
         if (plan instanceof InsertMultiTabletPlan) {
           applyInsert((InsertMultiTabletPlan) plan);
+        } else if (plan instanceof InsertRowsPlan) {
+          applyInsert((InsertRowsPlan) plan);
         } else if (plan instanceof InsertPlan) {
           applyInsert((InsertPlan) plan);
         } else {
@@ -101,6 +105,13 @@ public class DataLogApplier extends BaseApplier {
     }
   }
 
+  private void applyInsert(InsertRowsPlan plan)
+      throws StorageGroupNotSetException, QueryProcessException, StorageEngineException {
+    for (InsertRowPlan insertRowPlan : plan.getInsertRowPlanList()) {
+      applyInsert(insertRowPlan);
+    }
+  }
+
   private void applyInsert(InsertPlan plan)
       throws StorageGroupNotSetException, QueryProcessException, StorageEngineException {
     // check if the corresponding slot is being pulled
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index 50c3732..e3a58f1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -53,6 +53,7 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
@@ -431,8 +432,12 @@ public class CMManager extends MManager {
   public void createSchema(PhysicalPlan plan) throws MetadataException, CheckConsistencyException {
     // try to set storage group
     List<PartialPath> deviceIds;
-    // only handle InsertPlan, CreateTimeSeriesPlan and CreateMultiTimeSeriesPlan currently
-    if (plan instanceof InsertPlan && !(plan instanceof InsertMultiTabletPlan)) {
+    // only handle InsertPlan, CreateTimeSeriesPlan and CreateMultiTimeSeriesPlan currently,
+    if (plan instanceof InsertPlan
+        && !(plan instanceof InsertMultiTabletPlan)
+        && !(plan instanceof InsertRowsPlan)) {
+      // InsertMultiTabletPlan and InsertRowsPlan have multiple devices, and other types of
+      // InsertPlan have only one device.
       deviceIds = Collections.singletonList(((InsertPlan) plan).getDeviceId());
     } else if (plan instanceof CreateTimeSeriesPlan) {
       deviceIds = Collections.singletonList(((CreateTimeSeriesPlan) plan).getPath());
@@ -528,7 +533,26 @@ public class CMManager extends MManager {
       boolean success = createTimeseries(insertTabletPlan);
       allSuccess = allSuccess && success;
       if (!success) {
-        logger.error("create timeseries for device={} failed", insertTabletPlan.getDeviceId());
+        logger.error(
+            "create timeseries for device={} failed, plan={}",
+            insertTabletPlan.getDeviceId(),
+            insertTabletPlan);
+      }
+    }
+    return allSuccess;
+  }
+
+  public boolean createTimeseries(InsertRowsPlan insertRowsPlan)
+      throws CheckConsistencyException, IllegalPathException {
+    boolean allSuccess = true;
+    for (InsertRowPlan insertRowPlan : insertRowsPlan.getInsertRowPlanList()) {
+      boolean success = createTimeseries(insertRowPlan);
+      allSuccess = allSuccess && success;
+      if (!success) {
+        logger.error(
+            "create timeseries for device={} failed, plan={}",
+            insertRowPlan.getDeviceId(),
+            insertRowPlan);
       }
     }
     return allSuccess;
@@ -546,6 +570,10 @@ public class CMManager extends MManager {
       return createTimeseries((InsertMultiTabletPlan) insertPlan);
     }
 
+    if (insertPlan instanceof InsertRowsPlan) {
+      return createTimeseries((InsertRowsPlan) insertPlan);
+    }
+
     List<String> seriesList = new ArrayList<>();
     PartialPath deviceId = insertPlan.getDeviceId();
     PartialPath storageGroupName;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
index c573449..acb0b77 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CountPlan;
@@ -52,6 +53,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 
 public class ClusterPlanRouter {
 
@@ -110,7 +112,9 @@ public class ClusterPlanRouter {
 
   public Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(PhysicalPlan plan)
       throws UnsupportedPlanException, MetadataException {
-    if (plan instanceof InsertTabletPlan) {
+    if (plan instanceof InsertRowsPlan) {
+      return splitAndRoutePlan((InsertRowsPlan) plan);
+    } else if (plan instanceof InsertTabletPlan) {
       return splitAndRoutePlan((InsertTabletPlan) plan);
     } else if (plan instanceof InsertMultiTabletPlan) {
       return splitAndRoutePlan((InsertMultiTabletPlan) plan);
@@ -227,6 +231,35 @@ public class ClusterPlanRouter {
     return result;
   }
 
+  /**
+   * @param insertRowsPlan InsertRowsPlan
+   * @return key is InsertRowsPlan, value is the partition group the plan belongs to, all
+   *     InsertRowPlans in InsertRowsPlan belongs to one same storage group.
+   */
+  private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(InsertRowsPlan insertRowsPlan)
+      throws MetadataException {
+    Map<PhysicalPlan, PartitionGroup> result = new HashMap<>();
+    Map<PartitionGroup, InsertRowsPlan> groupPlanMap = new HashMap<>();
+    for (int i = 0; i < insertRowsPlan.getInsertRowPlanList().size(); i++) {
+      InsertRowPlan rowPlan = insertRowsPlan.getInsertRowPlanList().get(i);
+      PartialPath storageGroup = getMManager().getStorageGroupPath(rowPlan.getDeviceId());
+      PartitionGroup group = partitionTable.route(storageGroup.getFullPath(), rowPlan.getTime());
+      if (groupPlanMap.containsKey(group)) {
+        InsertRowsPlan tmpPlan = groupPlanMap.get(group);
+        tmpPlan.addOneInsertRowPlan(rowPlan, i);
+      } else {
+        InsertRowsPlan tmpPlan = new InsertRowsPlan();
+        tmpPlan.addOneInsertRowPlan(rowPlan, i);
+        groupPlanMap.put(group, tmpPlan);
+      }
+    }
+
+    for (Entry<PartitionGroup, InsertRowsPlan> entry : groupPlanMap.entrySet()) {
+      result.put(entry.getValue(), entry.getKey());
+    }
+    return result;
+  }
+
   @SuppressWarnings("SuspiciousSystemArraycopy")
   private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(InsertTabletPlan plan)
       throws MetadataException {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
index 76defce..af24983 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
@@ -99,6 +100,13 @@ public interface IPlanExecutor {
   /**
    * execute insert command and return whether the operator is successful.
    *
+   * @param insertRowsPlan physical insert rows plan, which contains multi insertRowPlans
+   */
+  void insert(InsertRowsPlan insertRowsPlan) throws QueryProcessException;
+
+  /**
+   * execute insert command and return whether the operator is successful.
+   *
    * @param insertRowsOfOneDevicePlan physical insert plan
    */
   void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan) throws QueryProcessException;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 299bcbb..0eb5e7e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -67,6 +67,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
@@ -115,6 +116,8 @@ import org.apache.iotdb.db.utils.AuthUtils;
 import org.apache.iotdb.db.utils.FileLoaderUtils;
 import org.apache.iotdb.db.utils.UpgradeUtils;
 import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
@@ -221,6 +224,9 @@ public class PlanExecutor implements IPlanExecutor {
       case BATCH_INSERT_ONE_DEVICE:
         insert((InsertRowsOfOneDevicePlan) plan);
         return true;
+      case BATCH_INSERT_ROWS:
+        insert((InsertRowsPlan) plan);
+        return true;
       case BATCHINSERT:
         insertTablet((InsertTabletPlan) plan);
         return true;
@@ -1095,6 +1101,27 @@ public class PlanExecutor implements IPlanExecutor {
   }
 
   @Override
+  public void insert(InsertRowsPlan plan) throws QueryProcessException {
+    boolean allSuccess = true;
+    for (int i = 0; i < plan.getInsertRowPlanList().size(); i++) {
+      if (plan.getResults().containsKey(i)) {
+        allSuccess = false;
+        continue;
+      }
+      try {
+        insert(plan.getInsertRowPlanList().get(i));
+        plan.getResults().put(i, RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+      } catch (QueryProcessException e) {
+        plan.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
+        allSuccess = false;
+      }
+      if (!allSuccess) {
+        throw new BatchProcessException(plan.getResults().values().toArray(new TSStatus[0]));
+      }
+    }
+  }
+
+  @Override
   public void insert(InsertRowPlan insertRowPlan) throws QueryProcessException {
     try {
       insertRowPlan.setMeasurementMNodes(
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index 3be1099..d1e9971 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -145,6 +145,7 @@ public abstract class Operator {
     MEASUREMENT_MNODE,
     STORAGE_GROUP_MNODE,
     BATCH_INSERT_ONE_DEVICE,
-    MULTI_BATCH_INSERT;
+    MULTI_BATCH_INSERT,
+    BATCH_INSERT_ROWS
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index 29888ea..b329812 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
@@ -348,6 +349,10 @@ public abstract class PhysicalPlan {
           plan = new StorageGroupMNodePlan();
           plan.deserialize(buffer);
           break;
+        case BATCH_INSERT_ROWS:
+          plan = new InsertRowsPlan();
+          plan.deserialize(buffer);
+          break;
         default:
           throw new IOException("unrecognized log type " + type);
       }
@@ -390,7 +395,8 @@ public abstract class PhysicalPlan {
     MEASUREMENT_MNODE,
     STORAGE_GROUP_MNODE,
     BATCH_INSERT_ONE_DEVICE,
-    MULTI_BATCH_INSERT
+    MULTI_BATCH_INSERT,
+    BATCH_INSERT_ROWS
   }
 
   public long getIndex() {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index 58b80c8..8399527 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -290,8 +290,11 @@ public class InsertRowPlan extends InsertPlan {
   public void serialize(DataOutputStream stream) throws IOException {
     int type = PhysicalPlanType.INSERT.ordinal();
     stream.writeByte((byte) type);
-    stream.writeLong(time);
+    subSerialize(stream);
+  }
 
+  public void subSerialize(DataOutputStream stream) throws IOException {
+    stream.writeLong(time);
     putString(stream, deviceId.getFullPath());
     serializeMeasurementsAndValues(stream);
   }
@@ -435,8 +438,11 @@ public class InsertRowPlan extends InsertPlan {
   public void serialize(ByteBuffer buffer) {
     int type = PhysicalPlanType.INSERT.ordinal();
     buffer.put((byte) type);
-    buffer.putLong(time);
+    subSerialize(buffer);
+  }
 
+  public void subSerialize(ByteBuffer buffer) {
+    buffer.putLong(time);
     putString(buffer, deviceId.getFullPath());
     serializeMeasurementsAndValues(buffer);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
index e3211f4..e13dc83 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
@@ -108,11 +108,10 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan {
 
   @Override
   public void serialize(ByteBuffer buffer) {
-    int type = PhysicalPlanType.INSERT.ordinal();
+    int type = PhysicalPlanType.BATCH_INSERT_ONE_DEVICE.ordinal();
     buffer.put((byte) type);
 
     putString(buffer, deviceId.getFullPath());
-
     buffer.putInt(rowPlans.length);
     for (InsertRowPlan plan : rowPlans) {
       buffer.putLong(plan.getTime());
@@ -122,7 +121,6 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan {
 
   @Override
   public void deserialize(ByteBuffer buffer) throws IllegalPathException {
-
     this.deviceId = new PartialPath(readString(buffer));
     this.rowPlans = new InsertRowPlan[buffer.getInt()];
     for (int i = 0; i < rowPlans.length; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java
new file mode 100644
index 0000000..5e2f484
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.physical.crud;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class InsertRowsPlan extends InsertPlan {
+  /**
+   * Suppose there is an InsertRowsPlan, which contains 5 InsertRowPlans,
+   * insertRowPlanList={InsertRowPlan_0, InsertRowPlan_1, InsertRowPlan_2, InsertRowPlan_3,
+   * InsertRowPlan_4}, then the insertRowPlanIndexList={0, 1, 2, 3, 4} respectively. But when the
+   * InsertRowsPlan is split into two InsertRowsPlans according to different storage group in
+   * cluster version, suppose that the InsertRowsPlan_1's insertRowPlanList = {InsertRowPlan_0,
+   * InsertRowPlan_3, InsertRowPlan_4}, then InsertRowsPlan_1's insertRowPlanIndexList = {0, 3, 4};
+   * InsertRowsPlan_2's insertRowPlanList = {InsertRowPlan_1, * InsertRowPlan_2} then
+   * InsertRowsPlan_2's insertRowPlanIndexList= {1, 2} respectively;
+   */
+  private List<Integer> insertRowPlanIndexList;
+
+  /** the InsertRowsPlan list */
+  private List<InsertRowPlan> insertRowPlanList;
+
+  /** record the result of insert rows */
+  private Map<Integer, TSStatus> results = new HashMap<>();
+
+  public InsertRowsPlan() {
+    super(OperatorType.BATCH_INSERT_ROWS);
+    insertRowPlanList = new ArrayList<>();
+    insertRowPlanIndexList = new ArrayList<>();
+  }
+
+  @Override
+  public long getMinTime() {
+    long minTime = Long.MAX_VALUE;
+    for (InsertRowPlan insertRowPlan : insertRowPlanList) {
+      if (insertRowPlan.getMinTime() < minTime) {
+        minTime = insertRowPlan.getMinTime();
+      }
+    }
+    return minTime;
+  }
+
+  @Override
+  public List<PartialPath> getPaths() {
+    List<PartialPath> result = new ArrayList<>();
+    for (InsertRowPlan insertRowPlan : insertRowPlanList) {
+      result.addAll(insertRowPlan.getPaths());
+    }
+    return result;
+  }
+
+  @Override
+  public void checkIntegrity() throws QueryProcessException {
+    for (InsertRowPlan insertRowPlan : insertRowPlanList) {
+      insertRowPlan.checkIntegrity();
+    }
+  }
+
+  @Override
+  public void recoverFromFailure() {
+    for (InsertRowPlan insertRowPlan : insertRowPlanList) {
+      insertRowPlan.recoverFromFailure();
+    }
+  }
+
+  @Override
+  public InsertPlan getPlanFromFailed() {
+    if (super.getPlanFromFailed() == null) {
+      return null;
+    }
+    List<InsertRowPlan> plans = new ArrayList<>();
+    List<Integer> indexes = new ArrayList<>();
+    for (int i = 0; i < insertRowPlanList.size(); i++) {
+      if (insertRowPlanList.get(i).hasFailedValues()) {
+        plans.add((InsertRowPlan) insertRowPlanList.get(i).getPlanFromFailed());
+        indexes.add(i);
+      }
+    }
+    this.insertRowPlanList = plans;
+    this.insertRowPlanIndexList = indexes;
+    return this;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    InsertRowsPlan that = (InsertRowsPlan) o;
+
+    if (!Objects.equals(insertRowPlanIndexList, that.insertRowPlanIndexList)) {
+      return false;
+    }
+    if (!Objects.equals(insertRowPlanList, that.insertRowPlanList)) {
+      return false;
+    }
+    return Objects.equals(results, that.results);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = insertRowPlanIndexList != null ? insertRowPlanIndexList.hashCode() : 0;
+    result = 31 * result + (insertRowPlanList != null ? insertRowPlanList.hashCode() : 0);
+    result = 31 * result + (results != null ? results.hashCode() : 0);
+    return result;
+  }
+
+  @Override
+  public void serialize(ByteBuffer buffer) {
+    int type = PhysicalPlanType.BATCH_INSERT_ROWS.ordinal();
+    buffer.put((byte) type);
+    buffer.putInt(insertRowPlanList.size());
+    for (InsertRowPlan insertRowPlan : insertRowPlanList) {
+      insertRowPlan.subSerialize(buffer);
+    }
+    for (Integer index : insertRowPlanIndexList) {
+      buffer.putInt(index);
+    }
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    int type = PhysicalPlanType.BATCH_INSERT_ROWS.ordinal();
+    stream.writeByte((byte) type);
+    stream.writeInt(insertRowPlanList.size());
+    for (InsertRowPlan insertRowPlan : insertRowPlanList) {
+      insertRowPlan.subSerialize(stream);
+    }
+    for (Integer index : insertRowPlanIndexList) {
+      stream.writeInt(index);
+    }
+  }
+
+  @Override
+  public void deserialize(ByteBuffer buffer) throws IllegalPathException {
+    int size = buffer.getInt();
+    this.insertRowPlanList = new ArrayList<>(size);
+    this.insertRowPlanIndexList = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      InsertRowPlan insertRowPlan = new InsertRowPlan();
+      insertRowPlan.deserialize(buffer);
+      insertRowPlanList.add(insertRowPlan);
+    }
+
+    for (int i = 0; i < size; i++) {
+      insertRowPlanIndexList.add(buffer.getInt());
+    }
+  }
+
+  public Map<Integer, TSStatus> getResults() {
+    return results;
+  }
+
+  public void addOneInsertRowPlan(InsertRowPlan plan, int index) {
+    insertRowPlanList.add(plan);
+    insertRowPlanIndexList.add(index);
+  }
+
+  public List<Integer> getInsertRowPlanIndexList() {
+    return insertRowPlanIndexList;
+  }
+
+  public List<InsertRowPlan> getInsertRowPlanList() {
+    return insertRowPlanList;
+  }
+
+  public int getRowCount() {
+    return insertRowPlanList.size();
+  }
+
+  public PartialPath getFirstDeviceId() {
+    return insertRowPlanList.get(0).getDeviceId();
+  }
+
+  @Override
+  public String toString() {
+    return "InsertRowsPlan{"
+        + " insertRowPlanIndexList="
+        + insertRowPlanIndexList
+        + ", insertRowPlanList="
+        + insertRowPlanList
+        + ", results="
+        + results
+        + "}";
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 55b5a1f..60feccf 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -56,6 +56,7 @@ import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
@@ -141,6 +142,7 @@ import java.util.Collections;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
@@ -1141,10 +1143,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
           req.deviceIds.get(0),
           req.getTimestamps().get(0));
     }
-
-    List<TSStatus> statusList = new ArrayList<>();
-
-    boolean isAllSuccessful = true;
+    boolean allSuccess = true;
+    InsertRowsPlan insertRowsPlan = new InsertRowsPlan();
     for (int i = 0; i < req.deviceIds.size(); i++) {
       try {
         InsertRowPlan plan =
@@ -1154,23 +1154,46 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
                 req.getMeasurementsList().get(i).toArray(new String[0]),
                 req.valuesList.get(i));
         TSStatus status = checkAuthority(plan, req.getSessionId());
-        if (status == null) {
-          status = executeNonQueryPlan(plan);
-          isAllSuccessful =
-              ((status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode())
-                  && isAllSuccessful);
+        if (status != null) {
+          insertRowsPlan.getResults().put(i, status);
+          allSuccess = false;
         }
-        statusList.add(status);
+        insertRowsPlan.addOneInsertRowPlan(plan, i);
       } catch (Exception e) {
-        isAllSuccessful = false;
-        statusList.add(
-            onNPEOrUnexpectedException(e, "inserting records", TSStatusCode.INTERNAL_SERVER_ERROR));
+        allSuccess = false;
+        insertRowsPlan
+            .getResults()
+            .put(
+                i,
+                onNPEOrUnexpectedException(
+                    e, "inserting records", TSStatusCode.INTERNAL_SERVER_ERROR));
       }
     }
+    TSStatus tsStatus = executeNonQueryPlan(insertRowsPlan);
+
+    return judgeFinalTsStatus(
+        allSuccess, tsStatus, insertRowsPlan.getResults(), req.deviceIds.size());
+  }
+
+  private TSStatus judgeFinalTsStatus(
+      boolean allSuccess,
+      TSStatus executeTsStatus,
+      Map<Integer, TSStatus> checkTsStatus,
+      int totalRowCount) {
+
+    if (allSuccess) {
+      return executeTsStatus;
+    }
 
-    return isAllSuccessful
-        ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)
-        : RpcUtils.getStatus(statusList);
+    if (executeTsStatus.subStatus == null) {
+      TSStatus[] tmpSubTsStatus = new TSStatus[totalRowCount];
+      Arrays.fill(tmpSubTsStatus, RpcUtils.SUCCESS_STATUS);
+      executeTsStatus.subStatus = Arrays.asList(tmpSubTsStatus);
+    }
+    for (Entry<Integer, TSStatus> entry : checkTsStatus.entrySet()) {
+      executeTsStatus.subStatus.set(entry.getKey(), entry.getValue());
+    }
+    return RpcUtils.getStatus(executeTsStatus.subStatus);
   }
 
   @Override
@@ -1229,10 +1252,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
           req.getTimestamps().get(0));
     }
 
-    List<TSStatus> statusList = new ArrayList<>();
-    InsertRowPlan plan = new InsertRowPlan();
-    boolean isAllSuccessful = true;
+    boolean allSuccess = true;
+    InsertRowsPlan insertRowsPlan = new InsertRowsPlan();
     for (int i = 0; i < req.deviceIds.size(); i++) {
+      InsertRowPlan plan = new InsertRowPlan();
       try {
         plan.setDeviceId(new PartialPath(req.getDeviceIds().get(i)));
         plan.setTime(req.getTimestamps().get(i));
@@ -1241,24 +1264,24 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         plan.setValues(req.getValuesList().get(i).toArray(new Object[0]));
         plan.setNeedInferType(true);
         TSStatus status = checkAuthority(plan, req.getSessionId());
-        if (status == null) {
-          status = executeNonQueryPlan(plan);
-          isAllSuccessful =
-              ((status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode())
-                  && isAllSuccessful);
+        if (status != null) {
+          insertRowsPlan.getResults().put(i, status);
         }
-        statusList.add(status);
+        insertRowsPlan.addOneInsertRowPlan(plan, i);
       } catch (Exception e) {
-        isAllSuccessful = false;
-        statusList.add(
-            onNPEOrUnexpectedException(
-                e, "inserting string records", TSStatusCode.INTERNAL_SERVER_ERROR));
+        insertRowsPlan
+            .getResults()
+            .put(
+                i,
+                onNPEOrUnexpectedException(
+                    e, "inserting string records", TSStatusCode.INTERNAL_SERVER_ERROR));
+        allSuccess = false;
       }
     }
+    TSStatus tsStatus = executeNonQueryPlan(insertRowsPlan);
 
-    return isAllSuccessful
-        ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)
-        : RpcUtils.getStatus(statusList);
+    return judgeFinalTsStatus(
+        allSuccess, tsStatus, insertRowsPlan.getResults(), req.deviceIds.size());
   }
 
   @Override