You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/12/24 05:48:12 UTC

[iotdb] 01/01: add insertOneDeviceRecords API in java session

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch new_insert_api
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 906dc397286e643a3eae229efe6ac7c8786f5ac7
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Thu Dec 24 13:47:31 2020 +0800

    add insertOneDeviceRecords API in java session
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  18 +++
 .../engine/storagegroup/StorageGroupProcessor.java |  44 ++++++
 .../apache/iotdb/db/qp/executor/IPlanExecutor.java |   8 ++
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  59 +++++++++
 .../org/apache/iotdb/db/qp/logical/Operator.java   |   3 +-
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |   3 +-
 .../iotdb/db/qp/physical/crud/InsertRowPlan.java   |  31 +++++
 .../physical/crud/InsertRowsOfOneDevicePlan.java   | 147 +++++++++++++++++++++
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  69 +++++++---
 .../crud/InsertRowsOfOneDevicePlanTest.java        |  25 ++++
 .../java/org/apache/iotdb/session/Session.java     | 130 +++++++++++++++++-
 .../org/apache/iotdb/session/pool/SessionPool.java |  57 ++++++++
 .../iotdb/session/IoTDBSessionComplexIT.java       |   1 +
 .../apache/iotdb/session/IoTDBSessionSimpleIT.java |  62 +++++++++
 thrift/src/main/thrift/rpc.thrift                  |  12 ++
 15 files changed, 644 insertions(+), 25 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 6f796b0..8bd7155 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -70,6 +70,7 @@ import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryFileManager;
@@ -120,6 +121,8 @@ public class StorageEngine implements IService {
 
   private ExecutorService recoverAllSgThreadPool;
 
+
+
   static class InstanceHolder {
 
     private InstanceHolder() {
@@ -381,6 +384,21 @@ public class StorageEngine implements IService {
     }
   }
 
+  public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
+      throws StorageEngineException {
+    StorageGroupProcessor storageGroupProcessor = getProcessor(insertRowsOfOneDevicePlan.getDeviceId());
+
+    // TODO monitor: update statistics
+    try {
+      storageGroupProcessor.insert(insertRowsOfOneDevicePlan);
+    } catch (WriteProcessException e) {
+      throw new StorageEngineException(e);
+    }
+
+
+  }
+
+
   /**
    * insert a InsertTabletPlan to a storage group
    *
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index bfe409d..6f854ee 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -77,6 +77,7 @@ import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryFileManager;
@@ -2492,6 +2493,49 @@ public class StorageGroupProcessor {
     return tsFileManagement;
   }
 
+  public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
+      throws WriteProcessException {
+
+    if (enableMemControl) {
+      StorageEngine.blockInsertionIfReject();
+    }
+    writeLock();
+    try {
+      boolean isSequence = false;
+      for (InsertRowPlan plan : insertRowsOfOneDevicePlan.getRowPlans()) {
+        if (!isAlive(plan.getTime())) {
+          //we do not need to write these part of data, as they can not be queried
+          continue;
+        }
+        // init map
+        long timePartitionId = StorageEngine.getTimePartition(plan.getTime());
+
+        partitionLatestFlushedTimeForEachDevice
+            .computeIfAbsent(timePartitionId, id -> new HashMap<>());
+        //as the plans have been ordered, and we have get the write lock,
+        //So, if a plan is sequenced, then all the rest plans are sequenced.
+        //
+        if (!isSequence) {
+          isSequence =
+              plan.getTime() > partitionLatestFlushedTimeForEachDevice.get(timePartitionId)
+                  .getOrDefault(plan.getDeviceId().getFullPath(), Long.MIN_VALUE);
+        }
+        //is unsequence and user set config to discard out of order data
+        if (!isSequence && IoTDBDescriptor.getInstance().getConfig()
+            .isEnableDiscardOutOfOrderData()) {
+          return;
+        }
+
+        latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>());
+        // insert to sequence or unSequence file
+        insertToTsFileProcessor(plan, isSequence);
+      }
+    } finally {
+      writeUnlock();
+    }
+
+  }
+
   private enum LoadTsFileType {
     LOAD_SEQUENCE, LOAD_UNSEQUENCE
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
index 744e57b..7a893af 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
@@ -92,6 +93,13 @@ public interface IPlanExecutor {
   void insert(InsertRowPlan insertRowPlan) throws QueryProcessException;
 
   /**
+   * execute insert command and return whether the operator is successful.
+   *
+   * @param insertRowsOfOneDevicePlan physical insert plan
+   */
+  void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan) throws QueryProcessException;
+
+  /**
    * execute batch insert plan
    *
    * @return result of each row
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 66a6dcf..cfc251c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -90,6 +90,7 @@ import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
@@ -195,6 +196,9 @@ public class PlanExecutor implements IPlanExecutor {
       case INSERT:
         insert((InsertRowPlan) plan);
         return true;
+      case BATCH_INSERT_ONE_DEVICE:
+        insert((InsertRowsOfOneDevicePlan) plan);
+        return true;
       case BATCHINSERT:
         insertTablet((InsertTabletPlan) plan);
         return true;
@@ -876,6 +880,7 @@ public class PlanExecutor implements IPlanExecutor {
 
       insertRowPlan
           .setMeasurementMNodes(new MeasurementMNode[insertRowPlan.getMeasurements().length]);
+      // check whether types are match
       getSeriesSchemas(insertRowPlan);
       insertRowPlan.transferType();
       StorageEngine.getInstance().insert(insertRowPlan);
@@ -907,6 +912,60 @@ public class PlanExecutor implements IPlanExecutor {
   }
 
   @Override
+  public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
+      throws QueryProcessException {
+    try {
+      for (InsertRowPlan plan : insertRowsOfOneDevicePlan.getRowPlans()) {
+        plan.setMeasurementMNodes(new MeasurementMNode[plan.getMeasurements().length]);
+        // check whether types are match
+        getSeriesSchemas(plan);
+        //we do not need to infer data type for insertRowsOfOneDevicePlan
+      }
+      //ok, we can begin to write data into the engine..
+      StorageEngine.getInstance().insert(insertRowsOfOneDevicePlan);
+
+      List<String> notExistedPaths = null;
+      List<String> failedMeasurements = null;
+       for (InsertRowPlan plan : insertRowsOfOneDevicePlan.getRowPlans()) {
+        if (plan.getFailedMeasurements() != null) {
+          if (notExistedPaths == null) {
+            notExistedPaths = new ArrayList<>();
+            failedMeasurements = new ArrayList<>();
+          }
+          // check if all path not exist exceptions
+          List<String> failedPaths = plan.getFailedMeasurements();
+          List<Exception> exceptions = plan.getFailedExceptions();
+          boolean isPathNotExistException = true;
+          for (Exception e : exceptions) {
+            Throwable curException = e;
+            while (curException.getCause() != null) {
+              curException = curException.getCause();
+            }
+            if (!(curException instanceof PathNotExistException)) {
+              isPathNotExistException = false;
+              break;
+            }
+          }
+          if (isPathNotExistException) {
+            notExistedPaths.addAll(failedPaths);
+          } else {
+            failedMeasurements.addAll(plan.getFailedMeasurements());
+          }
+        }
+      }
+      if (notExistedPaths != null && !notExistedPaths.isEmpty()) {
+        throw new PathNotExistException(notExistedPaths);
+      } else if (notExistedPaths != null && !failedMeasurements.isEmpty()) {
+        throw new StorageEngineException(
+            "failed to insert points " + failedMeasurements);
+      }
+
+    } catch (StorageEngineException | MetadataException e) {
+      throw new QueryProcessException(e);
+    }
+  }
+
+  @Override
   public void insertTablet(InsertTabletPlan insertTabletPlan) throws QueryProcessException {
     try {
       insertTabletPlan
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index b268365..9439913 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -77,6 +77,7 @@ public abstract class Operator {
     TTL, DELETE_STORAGE_GROUP, LOAD_CONFIGURATION, SHOW, LOAD_FILES, REMOVE_FILE, MOVE_FILE, LAST, GROUP_BY_FILL,
     ALTER_TIMESERIES, FLUSH, MERGE, FULL_MERGE, CLEAR_CACHE,
     SHOW_MERGE_STATUS, CREATE_SCHEMA_SNAPSHOT, TRACING, DELETE_PARTITION,
-    CREATE_MULTI_TIMESERIES
+    CREATE_MULTI_TIMESERIES,
+    BATCH_INSERT_ONE_DEVICE,
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index 649bbac..c2842f1 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -298,7 +298,8 @@ public abstract class PhysicalPlan {
     REVOKE_WATERMARK_EMBEDDING, CREATE_ROLE, DELETE_ROLE, CREATE_USER, REVOKE_USER_ROLE, REVOKE_ROLE_PRIVILEGE,
     REVOKE_USER_PRIVILEGE, GRANT_ROLE_PRIVILEGE, GRANT_USER_PRIVILEGE, GRANT_USER_ROLE, MODIFY_PASSWORD, DELETE_USER,
     DELETE_STORAGE_GROUP, SHOW_TIMESERIES, DELETE_TIMESERIES, LOAD_CONFIGURATION, MULTI_CREATE_TIMESERIES,
-    ALTER_TIMESERIES, FLUSH
+    ALTER_TIMESERIES, FLUSH,
+    BATCH_INSERT_ONE_DEVICE,
   }
 
   public long getIndex() {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index b41311e..3f96479 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -90,6 +90,20 @@ public class InsertRowPlan extends InsertPlan {
     isNeedInferType = true;
   }
 
+
+  public InsertRowPlan(PartialPath deviceId, long insertTime, String[] measurementList,
+      ByteBuffer  values) throws QueryProcessException {
+    super(Operator.OperatorType.INSERT);
+    this.time = insertTime;
+    this.deviceId = deviceId;
+    this.measurements = measurementList;
+    this.dataTypes = new TSDataType[measurementList.length];
+    this.values = new Object[measurementList.length];
+    this.fillValues(values);
+    isNeedInferType = false;
+  }
+
+
   @TestOnly
   public InsertRowPlan(PartialPath deviceId, long insertTime, String[] measurements,
       TSDataType[] dataTypes, String[] insertValues) {
@@ -257,6 +271,10 @@ public class InsertRowPlan extends InsertPlan {
 
     putString(stream, deviceId.getFullPath());
 
+    serializeMeasurementsAndValues(stream);
+  }
+
+  void serializeMeasurementsAndValues(DataOutputStream stream) throws IOException {
     stream.writeInt(
         measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
 
@@ -278,6 +296,7 @@ public class InsertRowPlan extends InsertPlan {
     stream.writeLong(index);
   }
 
+
   private void putValues(DataOutputStream outputStream) throws QueryProcessException, IOException {
     for (int i = 0; i < values.length; i++) {
       // types are not determined, the situation mainly occurs when the plan uses string values
@@ -397,6 +416,10 @@ public class InsertRowPlan extends InsertPlan {
 
     putString(buffer, deviceId.getFullPath());
 
+    serializeMeasurementsAndValues(buffer);
+  }
+
+  void serializeMeasurementsAndValues(ByteBuffer buffer) {
     buffer
         .putInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
 
@@ -422,6 +445,10 @@ public class InsertRowPlan extends InsertPlan {
     this.time = buffer.getLong();
     this.deviceId = new PartialPath(readString(buffer));
 
+    deserializeMeasurementsAndValues(buffer);
+  }
+
+  void deserializeMeasurementsAndValues(ByteBuffer buffer) {
     int measurementSize = buffer.getInt();
 
     this.measurements = new String[measurementSize];
@@ -464,4 +491,8 @@ public class InsertRowPlan extends InsertPlan {
     failedValues = null;
     return this;
   }
+
+  boolean hasFailedValues() {
+    return failedValues != null && !failedValues.isEmpty();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
new file mode 100644
index 0000000..0bc6592
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.physical.crud;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+
+public class InsertRowsOfOneDevicePlan extends InsertPlan {
+
+  private InsertRowPlan[] rowPlans;
+
+  public InsertRowsOfOneDevicePlan(PartialPath deviceId, Long[] insertTimes,
+      List<List<String>> measurements,
+      ByteBuffer[] insertValues) throws QueryProcessException {
+    super(OperatorType.BATCH_INSERT_ONE_DEVICE);
+    this.deviceId = deviceId;
+    rowPlans = new InsertRowPlan[insertTimes.length];
+    for (int i = 0; i < insertTimes.length; i++) {
+      rowPlans[i] = new InsertRowPlan(deviceId, insertTimes[i], measurements.get(i).toArray(new String[0]), insertValues[i]);
+      if (rowPlans[i].getMeasurements().length == 0) {
+        throw new QueryProcessException(
+            "The measurements are null, deviceId:" + deviceId
+                + ", time:" + insertTimes[i]);
+      }
+      if (rowPlans[i].getValues().length == 0) {
+        throw new QueryProcessException(
+            "The size of values in InsertRowsOfOneDevicePlan is 0, deviceId:" + deviceId
+                + ", time:" + insertTimes[i]);
+      }
+    }
+  }
+
+  //TODO do we need to rewrite hashCode?
+
+  @Override
+  public List<PartialPath> getPaths() {
+    Set<PartialPath> paths = new HashSet<>();
+    for (InsertRowPlan plan : rowPlans) {
+      paths.addAll(plan.getPaths());
+    }
+    return new ArrayList<>(paths);
+  }
+
+  @Override
+  public long getMinTime() {
+    long minTime = Long.MAX_VALUE;
+    for (InsertRowPlan plan : rowPlans) {
+      if (minTime > plan.getTime()) {
+        minTime = plan.getTime();
+      }
+    }
+    return minTime;
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    int type = PhysicalPlanType.BATCH_INSERT_ONE_DEVICE.ordinal();
+    stream.writeByte((byte) type);
+    putString(stream, deviceId.getFullPath());
+
+    stream.writeInt(rowPlans.length);
+    for (InsertRowPlan plan : rowPlans) {
+      stream.writeLong(plan.getTime());
+      plan.serializeMeasurementsAndValues(stream);
+    }
+  }
+
+
+  @Override
+  public void serialize(ByteBuffer buffer) {
+    int type = PhysicalPlanType.INSERT.ordinal();
+    buffer.put((byte) type);
+
+    putString(buffer, deviceId.getFullPath());
+
+    buffer.putInt(rowPlans.length);
+    for (InsertRowPlan plan : rowPlans) {
+      buffer.putLong(plan.getTime());
+      plan.serializeMeasurementsAndValues(buffer);
+    }
+  }
+
+  @Override
+  public void deserialize(ByteBuffer buffer) throws IllegalPathException {
+
+    this.deviceId = new PartialPath(readString(buffer));
+    this.rowPlans = new InsertRowPlan[buffer.getInt()];
+    for (int i = 0; i < rowPlans.length; i ++) {
+      rowPlans[i] = new InsertRowPlan();
+      rowPlans[i].setDeviceId(deviceId);
+      rowPlans[i].setTime(buffer.getLong());
+      rowPlans[i].deserializeMeasurementsAndValues(buffer);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "deviceId: " + deviceId + ", times: " + rowPlans.length ;
+  }
+
+
+  @Override
+  public InsertPlan getPlanFromFailed() {
+    if (super.getPlanFromFailed() == null) {
+      return null;
+    }
+    List<InsertRowPlan> plans = new ArrayList<>();
+    for (InsertRowPlan plan : rowPlans) {
+      if (plan.hasFailedValues()) {
+        plans.add((InsertRowPlan) plan.getPlanFromFailed());
+      }
+    }
+    this.rowPlans = plans.toArray(new InsertRowPlan[0]);
+    return this;
+  }
+
+
+  public InsertRowPlan[] getRowPlans() {
+    return rowPlans;
+  }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 76f4f59..7f2bf8d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -73,6 +73,7 @@ import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
@@ -114,6 +115,7 @@ import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
 import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
 import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
 import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSInsertOneDeviceRecordsReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
@@ -1256,14 +1258,47 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
     for (int i = 0; i < req.deviceIds.size(); i++) {
       try {
-        InsertRowPlan plan = new InsertRowPlan();
-        plan.setDeviceId(new PartialPath(req.getDeviceIds().get(i)));
-        plan.setTime(req.getTimestamps().get(i));
-        plan.setMeasurements(req.getMeasurementsList().get(i).toArray(new String[0]));
-        plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
-        plan.setValues(new Object[plan.getMeasurements().length]);
-        plan.fillValues(req.valuesList.get(i));
-        plan.setNeedInferType(false);
+        InsertRowPlan plan = new InsertRowPlan(
+            new PartialPath(req.getDeviceIds().get(i)), req.getTimestamps().get(i),
+            req.getMeasurementsList().get(i).toArray(new String[0]), req.valuesList.get(i)
+        );
+        TSStatus status = checkAuthority(plan, req.getSessionId());
+        if (status != null) {
+          statusList.add(status);
+        } else {
+          statusList.add(executeNonQueryPlan(plan));
+        }
+      } catch (Exception e) {
+        logger.error("meet error when insert in batch", e);
+        statusList.add(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+      }
+    }
+
+    return RpcUtils.getStatus(statusList);
+  }
+
+  @Override
+  public TSStatus insertOneDeviceRecords(TSInsertOneDeviceRecordsReq req) throws TException {
+    if (auditLogger.isDebugEnabled()) {
+      auditLogger
+          .debug("Session {} insertRecords, device {}, first time {}", currSessionId.get(),
+              req.deviceId, req.getTimestamps().get(0));
+    }
+    if (!checkLogin(req.getSessionId())) {
+      logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
+      return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
+    }
+
+    List<TSStatus> statusList = new ArrayList<>();
+
+    for (int i = 0; i < req.measurementsList.size(); i++) {
+      try {
+        InsertRowsOfOneDevicePlan plan = new InsertRowsOfOneDevicePlan(
+            new PartialPath(req.getDeviceId()),
+            req.getTimestamps().toArray(new Long[0]),
+            req.getMeasurementsList(),
+            req.getValuesList().toArray(new ByteBuffer[0])
+        );
         TSStatus status = checkAuthority(plan, req.getSessionId());
         if (status != null) {
           statusList.add(status);
@@ -1348,6 +1383,12 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   }
 
   @Override
+  public TSStatus testInsertOneDeviceRecords(TSInsertOneDeviceRecordsReq req) throws TException {
+    logger.debug("Test insert rows in batch request receive.");
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  @Override
   public TSStatus testInsertStringRecords(TSInsertStringRecordsReq req) throws TException {
     logger.debug("Test insert string records request receive.");
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
@@ -1364,14 +1405,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
       }
 
-      InsertRowPlan plan = new InsertRowPlan();
-      plan.setDeviceId(new PartialPath(req.getDeviceId()));
-      plan.setTime(req.getTimestamp());
-      plan.setMeasurements(req.getMeasurements().toArray(new String[0]));
-      plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
-      plan.setValues(new Object[plan.getMeasurements().length]);
-      plan.fillValues(req.values);
-      plan.setNeedInferType(false);
+      InsertRowPlan plan = new InsertRowPlan(
+          new PartialPath(req.getDeviceId()), req.getTimestamp(),
+          req.getMeasurements().toArray(new String[0]), req.values
+      );
 
       TSStatus status = checkAuthority(plan, req.getSessionId());
       if (status != null) {
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlanTest.java
new file mode 100644
index 0000000..f7aed72
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlanTest.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.physical.crud;
+
+public class InsertRowsOfOneDevicePlanTest {
+
+
+}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index d4abb13..8e59b6f 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -36,6 +37,7 @@ import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
 import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSInsertOneDeviceRecordsReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
@@ -379,6 +381,117 @@ public class Session {
     }
   }
 
+  /**
+   * Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
+   * executeBatch, we pack some insert request in batch and send them to server. If you want improve
+   * your performance, please see insertTablet method
+   * <p>
+   * Each row is independent, which could have different deviceId, time, number of measurements
+   *
+   * @see Session#insertTablet(Tablet)
+   */
+  public void insertOneDeviceRecords(String deviceId, List<Long> times,
+      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException {
+     insertOneDeviceRecords(deviceId, times, measurementsList, typesList, valuesList, false);
+  }
+  /**
+   * Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
+   * executeBatch, we pack some insert request in batch and send them to server. If you want improve
+   * your performance, please see insertTablet method
+   * <p>
+   * Each row is independent, which could have different deviceId, time, number of measurements
+   *
+   * @param haveSorted  whether the times have been sorted
+   * @see Session#insertTablet(Tablet)
+   */
+  public void insertOneDeviceRecords(String deviceId, List<Long> times,
+      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList, boolean haveSorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    TSInsertOneDeviceRecordsReq request = genTSInsertOneDeviceRecordsReq(deviceId, times, measurementsList,
+        typesList, valuesList, haveSorted);
+    try {
+      RpcUtils.verifySuccess(client.insertOneDeviceRecords(request));
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.insertOneDeviceRecords(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(
+            "Fail to reconnect to server. Please check server status");
+      }
+    }
+  }
+
+  private TSInsertOneDeviceRecordsReq genTSInsertOneDeviceRecordsReq(String deviceId, List<Long> times,
+      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList, boolean haveSorted) throws IoTDBConnectionException, BatchExecutionException {
+    // check params size
+    int len = times.size();
+    if (len != measurementsList.size() || len != valuesList.size()) {
+      throw new IllegalArgumentException(
+          "times, measurementsList and valuesList's size should be equal");
+    }
+
+    if (haveSorted) {
+      if (!checkSorted(times)) {
+        throw new BatchExecutionException("Times in InsertOneDeviceRecords are not in ascending order");
+      }
+    } else {
+      sortRecordsInOneDevice(times, measurementsList, typesList, valuesList);
+    }
+
+    TSInsertOneDeviceRecordsReq request = new TSInsertOneDeviceRecordsReq();
+    request.setSessionId(sessionId);
+    request.setDeviceId(deviceId);
+    request.setTimestamps(times);
+    request.setMeasurementsList(measurementsList);
+    List<ByteBuffer> buffersList = objectValuesListToByteBufferList(valuesList, typesList);
+    request.setValuesList(buffersList);
+    return request;
+  }
+
+  private void sortRecordsInOneDevice(List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList) {
+    Integer[] index = new Integer[times.size()];
+    for (int i = 0; i < times.size(); i++) {
+      index[i] = i;
+    }
+    Arrays.sort(index, Comparator.comparingLong(times::get));
+    times.sort(Long::compareTo);
+    //sort measurementList
+    sortList(measurementsList, index);
+    //sort typesList
+    sortList(typesList, index);
+    //sort values
+    sortList(valuesList, index);
+  }
+
+  private void sortList(List source, Integer[] index) {
+    List tmpLists = new ArrayList<>(source);
+
+    for (int i = 0; i < index.length; i++) {
+      source.set(i, tmpLists.get(index[i]));
+    }
+  }
+
+  private List<ByteBuffer> objectValuesListToByteBufferList(List<List<Object>> valuesList, List<List<TSDataType>> typesList)
+      throws IoTDBConnectionException {
+    List<ByteBuffer> buffersList = new ArrayList<>();
+    for (int i = 0; i < valuesList.size(); i++) {
+      ByteBuffer buffer = ByteBuffer.allocate(calculateLength(typesList.get(i), valuesList.get(i)));
+      putValues(typesList.get(i), valuesList.get(i), buffer);
+      buffer.flip();
+      buffersList.add(buffer);
+    }
+    return buffersList;
+  }
+
+
   private TSInsertRecordsReq genTSInsertRecordsReq(List<String> deviceIds, List<Long> times,
       List<List<String>> measurementsList, List<List<TSDataType>> typesList,
       List<List<Object>> valuesList) throws IoTDBConnectionException {
@@ -394,13 +507,7 @@ public class Session {
     request.setDeviceIds(deviceIds);
     request.setTimestamps(times);
     request.setMeasurementsList(measurementsList);
-    List<ByteBuffer> buffersList = new ArrayList<>();
-    for (int i = 0; i < measurementsList.size(); i++) {
-      ByteBuffer buffer = ByteBuffer.allocate(calculateLength(typesList.get(i), valuesList.get(i)));
-      putValues(typesList.get(i), valuesList.get(i), buffer);
-      buffer.flip();
-      buffersList.add(buffer);
-    }
+    List<ByteBuffer> buffersList = objectValuesListToByteBufferList(valuesList, typesList);
     request.setValuesList(buffersList);
     return request;
   }
@@ -1064,6 +1171,15 @@ public class Session {
     return true;
   }
 
+  private boolean checkSorted(List<Long> times) {
+    for (int i = 1; i < times.size(); i++) {
+      if (times.get(i) < times.get(i - 1)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   public void sortTablet(Tablet tablet) {
     /*
      * following part of code sort the batch data by time,
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 0f8bb56..72ac8d1 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -415,6 +415,63 @@ public class SessionPool {
 
 
   /**
+   * Insert data that belong to the same device in batch format, which can reduce the overhead of
+   * network. This method is just like jdbc batch insert, we pack some insert request in batch and
+   * send them to server If you want
+   * improve your performance, please see insertTablet method
+   *
+   * @see Session#insertTablet(Tablet)
+   */
+  public void insertOneDeviceRecords(String deviceId, List<Long> times,
+      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList) throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.insertOneDeviceRecords(deviceId, times, measurementsList, typesList, valuesList, false);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        logger.warn("insertOneDeviceRecords failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * Insert data that belong to the same device in batch format, which can reduce the overhead of
+   * network. This method is just like jdbc batch insert, we pack some insert request in batch and
+   * send them to server If you want
+   * improve your performance, please see insertTablet method
+   * @param  haveSorted whether the times list has been ordered.
+   * @see Session#insertTablet(Tablet)
+   */
+  public void insertOneDeviceRecords(String deviceId, List<Long> times,
+      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList, boolean haveSorted) throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.insertOneDeviceRecords(deviceId, times, measurementsList, typesList, valuesList, haveSorted);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        logger.warn("insertOneDeviceRecords failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+
+  /**
    * Insert data in batch format, which can reduce the overhead of network. This method is just like
    * jdbc batch insert, we pack some insert request in batch and send them to server If you want
    * improve your performance, please see insertTablet method
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java
index ad59aea..a0b6123 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java
@@ -779,4 +779,5 @@ public class IoTDBSessionComplexIT {
       Assert.assertEquals(700, count);
     }
   }
+
 }
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java
index 09849c8..f220234 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java
@@ -388,4 +388,66 @@ public class IoTDBSessionSimpleIT {
     session.deleteStorageGroup(storageGroup);
     session.close();
   }
+
+  @Test
+  public void testInsertOneDeviceRecords()
+      throws IoTDBConnectionException, StatementExecutionException {
+    session = new Session("127.0.0.1", 6667, "root", "root");
+    session.open();
+    List<Long> times = new ArrayList<>();
+    List<List<String>> measurements = new ArrayList<>();
+    List<List<TSDataType>> datatypes = new ArrayList<>();
+    List<List<Object>> values = new ArrayList<>();
+
+    List<String> tmpMeasurements = null;
+    List<TSDataType> tmpDataTypes = null;
+    List<Object> tmpValues = null;
+
+    times.add(1L);
+    tmpMeasurements = new ArrayList<>();
+    tmpDataTypes = new ArrayList<>();
+    tmpValues = new ArrayList<>();
+    tmpMeasurements.add("s1");
+    tmpMeasurements.add("s2");
+    tmpDataTypes.add(TSDataType.INT32);
+    tmpDataTypes.add(TSDataType.INT32);
+    tmpValues.add(1);
+    tmpValues.add(2);
+    measurements.add(tmpMeasurements);
+    datatypes.add(tmpDataTypes);
+    values.add(tmpValues);
+
+
+    times.add(2L);
+    tmpMeasurements = new ArrayList<>();
+    tmpDataTypes = new ArrayList<>();
+    tmpValues = new ArrayList<>();
+    tmpMeasurements.add("s2");
+    tmpMeasurements.add("s3");
+    tmpDataTypes.add(TSDataType.INT32);
+    tmpDataTypes.add(TSDataType.INT64);
+    tmpValues.add(3);
+    tmpValues.add(4L);
+    measurements.add(tmpMeasurements);
+    datatypes.add(tmpDataTypes);
+    values.add(tmpValues);
+
+    times.add(3L);
+    tmpMeasurements = new ArrayList<>();
+    tmpDataTypes = new ArrayList<>();
+    tmpValues = new ArrayList<>();
+    tmpMeasurements.add("s4");
+    tmpMeasurements.add("s5");
+    tmpDataTypes.add(TSDataType.FLOAT);
+    tmpDataTypes.add(TSDataType.BOOLEAN);
+    tmpValues.add(5.0f);
+    tmpValues.add(Boolean.TRUE);
+    measurements.add(tmpMeasurements);
+    datatypes.add(tmpDataTypes);
+    values.add(tmpValues);
+
+    session.insertOneDeviceRecords("root.sg.d1", times, measurements, datatypes, values);
+    session.close();
+
+  }
 }
diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift
index 4f412bf..45eda3d 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -225,6 +225,14 @@ struct TSInsertRecordsReq {
     5: required list<i64> timestamps
 }
 
+struct TSInsertOneDeviceRecordsReq {
+    1: required i64 sessionId
+    2: required string deviceId
+    3: required list<list<string>> measurementsList
+    4: required list<binary> valuesList
+    5: required list<i64> timestamps
+}
+
 struct TSInsertStringRecordsReq {
     1: required i64 sessionId
     2: required list<string> deviceIds
@@ -326,6 +334,8 @@ service TSIService {
 
 	TSStatus insertRecords(1:TSInsertRecordsReq req);
 
+	TSStatus insertOneDeviceRecords(1:TSInsertOneDeviceRecordsReq req);
+
 	TSStatus insertStringRecords(1:TSInsertStringRecordsReq req);
 
 	TSStatus testInsertTablet(1:TSInsertTabletReq req);
@@ -338,6 +348,8 @@ service TSIService {
 
   TSStatus testInsertRecords(1:TSInsertRecordsReq req);
 
+  TSStatus testInsertOneDeviceRecords(1:TSInsertOneDeviceRecordsReq req);
+
   TSStatus testInsertStringRecords(1:TSInsertStringRecordsReq req);
 
 	TSStatus deleteData(1:TSDeleteDataReq req);