You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/12/24 05:48:11 UTC

[iotdb] branch new_insert_api created (now 906dc39)

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a change to branch new_insert_api
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 906dc39  add insertOneDeviceRecords API in java session

This branch includes the following new commits:

     new 906dc39  add insertOneDeviceRecords API in java session

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: add insertOneDeviceRecords API in java session

Posted by hx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch new_insert_api
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 906dc397286e643a3eae229efe6ac7c8786f5ac7
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Thu Dec 24 13:47:31 2020 +0800

    add insertOneDeviceRecords API in java session
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  18 +++
 .../engine/storagegroup/StorageGroupProcessor.java |  44 ++++++
 .../apache/iotdb/db/qp/executor/IPlanExecutor.java |   8 ++
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  59 +++++++++
 .../org/apache/iotdb/db/qp/logical/Operator.java   |   3 +-
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |   3 +-
 .../iotdb/db/qp/physical/crud/InsertRowPlan.java   |  31 +++++
 .../physical/crud/InsertRowsOfOneDevicePlan.java   | 147 +++++++++++++++++++++
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  69 +++++++---
 .../crud/InsertRowsOfOneDevicePlanTest.java        |  25 ++++
 .../java/org/apache/iotdb/session/Session.java     | 130 +++++++++++++++++-
 .../org/apache/iotdb/session/pool/SessionPool.java |  57 ++++++++
 .../iotdb/session/IoTDBSessionComplexIT.java       |   1 +
 .../apache/iotdb/session/IoTDBSessionSimpleIT.java |  62 +++++++++
 thrift/src/main/thrift/rpc.thrift                  |  12 ++
 15 files changed, 644 insertions(+), 25 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 6f796b0..8bd7155 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -70,6 +70,7 @@ import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryFileManager;
@@ -120,6 +121,8 @@ public class StorageEngine implements IService {
 
   private ExecutorService recoverAllSgThreadPool;
 
+
+
   static class InstanceHolder {
 
     private InstanceHolder() {
@@ -381,6 +384,21 @@ public class StorageEngine implements IService {
     }
   }
 
+  public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
+      throws StorageEngineException {
+    StorageGroupProcessor storageGroupProcessor = getProcessor(insertRowsOfOneDevicePlan.getDeviceId());
+
+    // TODO monitor: update statistics
+    try {
+      storageGroupProcessor.insert(insertRowsOfOneDevicePlan);
+    } catch (WriteProcessException e) {
+      throw new StorageEngineException(e);
+    }
+
+
+  }
+
+
   /**
    * insert a InsertTabletPlan to a storage group
    *
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index bfe409d..6f854ee 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -77,6 +77,7 @@ import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryFileManager;
@@ -2492,6 +2493,49 @@ public class StorageGroupProcessor {
     return tsFileManagement;
   }
 
+  public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
+      throws WriteProcessException {
+
+    if (enableMemControl) {
+      StorageEngine.blockInsertionIfReject();
+    }
+    writeLock();
+    try {
+      boolean isSequence = false;
+      for (InsertRowPlan plan : insertRowsOfOneDevicePlan.getRowPlans()) {
+        if (!isAlive(plan.getTime())) {
+          //we do not need to write these part of data, as they can not be queried
+          continue;
+        }
+        // init map
+        long timePartitionId = StorageEngine.getTimePartition(plan.getTime());
+
+        partitionLatestFlushedTimeForEachDevice
+            .computeIfAbsent(timePartitionId, id -> new HashMap<>());
+        //as the plans have been ordered, and we have get the write lock,
+        //So, if a plan is sequenced, then all the rest plans are sequenced.
+        //
+        if (!isSequence) {
+          isSequence =
+              plan.getTime() > partitionLatestFlushedTimeForEachDevice.get(timePartitionId)
+                  .getOrDefault(plan.getDeviceId().getFullPath(), Long.MIN_VALUE);
+        }
+        //is unsequence and user set config to discard out of order data
+        if (!isSequence && IoTDBDescriptor.getInstance().getConfig()
+            .isEnableDiscardOutOfOrderData()) {
+          return;
+        }
+
+        latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>());
+        // insert to sequence or unSequence file
+        insertToTsFileProcessor(plan, isSequence);
+      }
+    } finally {
+      writeUnlock();
+    }
+
+  }
+
   private enum LoadTsFileType {
     LOAD_SEQUENCE, LOAD_UNSEQUENCE
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
index 744e57b..7a893af 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
@@ -92,6 +93,13 @@ public interface IPlanExecutor {
   void insert(InsertRowPlan insertRowPlan) throws QueryProcessException;
 
   /**
+   * execute insert command and return whether the operator is successful.
+   *
+   * @param insertRowsOfOneDevicePlan physical insert plan
+   */
+  void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan) throws QueryProcessException;
+
+  /**
    * execute batch insert plan
    *
    * @return result of each row
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 66a6dcf..cfc251c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -90,6 +90,7 @@ import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
@@ -195,6 +196,9 @@ public class PlanExecutor implements IPlanExecutor {
       case INSERT:
         insert((InsertRowPlan) plan);
         return true;
+      case BATCH_INSERT_ONE_DEVICE:
+        insert((InsertRowsOfOneDevicePlan) plan);
+        return true;
       case BATCHINSERT:
         insertTablet((InsertTabletPlan) plan);
         return true;
@@ -876,6 +880,7 @@ public class PlanExecutor implements IPlanExecutor {
 
       insertRowPlan
           .setMeasurementMNodes(new MeasurementMNode[insertRowPlan.getMeasurements().length]);
+      // check whether types are match
       getSeriesSchemas(insertRowPlan);
       insertRowPlan.transferType();
       StorageEngine.getInstance().insert(insertRowPlan);
@@ -907,6 +912,60 @@ public class PlanExecutor implements IPlanExecutor {
   }
 
   @Override
+  public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
+      throws QueryProcessException {
+    try {
+      for (InsertRowPlan plan : insertRowsOfOneDevicePlan.getRowPlans()) {
+        plan.setMeasurementMNodes(new MeasurementMNode[plan.getMeasurements().length]);
+        // check whether types are match
+        getSeriesSchemas(plan);
+        //we do not need to infer data type for insertRowsOfOneDevicePlan
+      }
+      //ok, we can begin to write data into the engine..
+      StorageEngine.getInstance().insert(insertRowsOfOneDevicePlan);
+
+      List<String> notExistedPaths = null;
+      List<String> failedMeasurements = null;
+       for (InsertRowPlan plan : insertRowsOfOneDevicePlan.getRowPlans()) {
+        if (plan.getFailedMeasurements() != null) {
+          if (notExistedPaths == null) {
+            notExistedPaths = new ArrayList<>();
+            failedMeasurements = new ArrayList<>();
+          }
+          // check if all path not exist exceptions
+          List<String> failedPaths = plan.getFailedMeasurements();
+          List<Exception> exceptions = plan.getFailedExceptions();
+          boolean isPathNotExistException = true;
+          for (Exception e : exceptions) {
+            Throwable curException = e;
+            while (curException.getCause() != null) {
+              curException = curException.getCause();
+            }
+            if (!(curException instanceof PathNotExistException)) {
+              isPathNotExistException = false;
+              break;
+            }
+          }
+          if (isPathNotExistException) {
+            notExistedPaths.addAll(failedPaths);
+          } else {
+            failedMeasurements.addAll(plan.getFailedMeasurements());
+          }
+        }
+      }
+      if (notExistedPaths != null && !notExistedPaths.isEmpty()) {
+        throw new PathNotExistException(notExistedPaths);
+      } else if (notExistedPaths != null && !failedMeasurements.isEmpty()) {
+        throw new StorageEngineException(
+            "failed to insert points " + failedMeasurements);
+      }
+
+    } catch (StorageEngineException | MetadataException e) {
+      throw new QueryProcessException(e);
+    }
+  }
+
+  @Override
   public void insertTablet(InsertTabletPlan insertTabletPlan) throws QueryProcessException {
     try {
       insertTabletPlan
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index b268365..9439913 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -77,6 +77,7 @@ public abstract class Operator {
     TTL, DELETE_STORAGE_GROUP, LOAD_CONFIGURATION, SHOW, LOAD_FILES, REMOVE_FILE, MOVE_FILE, LAST, GROUP_BY_FILL,
     ALTER_TIMESERIES, FLUSH, MERGE, FULL_MERGE, CLEAR_CACHE,
     SHOW_MERGE_STATUS, CREATE_SCHEMA_SNAPSHOT, TRACING, DELETE_PARTITION,
-    CREATE_MULTI_TIMESERIES
+    CREATE_MULTI_TIMESERIES,
+    BATCH_INSERT_ONE_DEVICE,
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index 649bbac..c2842f1 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -298,7 +298,8 @@ public abstract class PhysicalPlan {
     REVOKE_WATERMARK_EMBEDDING, CREATE_ROLE, DELETE_ROLE, CREATE_USER, REVOKE_USER_ROLE, REVOKE_ROLE_PRIVILEGE,
     REVOKE_USER_PRIVILEGE, GRANT_ROLE_PRIVILEGE, GRANT_USER_PRIVILEGE, GRANT_USER_ROLE, MODIFY_PASSWORD, DELETE_USER,
     DELETE_STORAGE_GROUP, SHOW_TIMESERIES, DELETE_TIMESERIES, LOAD_CONFIGURATION, MULTI_CREATE_TIMESERIES,
-    ALTER_TIMESERIES, FLUSH
+    ALTER_TIMESERIES, FLUSH,
+    BATCH_INSERT_ONE_DEVICE,
   }
 
   public long getIndex() {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index b41311e..3f96479 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -90,6 +90,20 @@ public class InsertRowPlan extends InsertPlan {
     isNeedInferType = true;
   }
 
+
+  public InsertRowPlan(PartialPath deviceId, long insertTime, String[] measurementList,
+      ByteBuffer  values) throws QueryProcessException {
+    super(Operator.OperatorType.INSERT);
+    this.time = insertTime;
+    this.deviceId = deviceId;
+    this.measurements = measurementList;
+    this.dataTypes = new TSDataType[measurementList.length];
+    this.values = new Object[measurementList.length];
+    this.fillValues(values);
+    isNeedInferType = false;
+  }
+
+
   @TestOnly
   public InsertRowPlan(PartialPath deviceId, long insertTime, String[] measurements,
       TSDataType[] dataTypes, String[] insertValues) {
@@ -257,6 +271,10 @@ public class InsertRowPlan extends InsertPlan {
 
     putString(stream, deviceId.getFullPath());
 
+    serializeMeasurementsAndValues(stream);
+  }
+
+  void serializeMeasurementsAndValues(DataOutputStream stream) throws IOException {
     stream.writeInt(
         measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
 
@@ -278,6 +296,7 @@ public class InsertRowPlan extends InsertPlan {
     stream.writeLong(index);
   }
 
+
   private void putValues(DataOutputStream outputStream) throws QueryProcessException, IOException {
     for (int i = 0; i < values.length; i++) {
       // types are not determined, the situation mainly occurs when the plan uses string values
@@ -397,6 +416,10 @@ public class InsertRowPlan extends InsertPlan {
 
     putString(buffer, deviceId.getFullPath());
 
+    serializeMeasurementsAndValues(buffer);
+  }
+
+  void serializeMeasurementsAndValues(ByteBuffer buffer) {
     buffer
         .putInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
 
@@ -422,6 +445,10 @@ public class InsertRowPlan extends InsertPlan {
     this.time = buffer.getLong();
     this.deviceId = new PartialPath(readString(buffer));
 
+    deserializeMeasurementsAndValues(buffer);
+  }
+
+  void deserializeMeasurementsAndValues(ByteBuffer buffer) {
     int measurementSize = buffer.getInt();
 
     this.measurements = new String[measurementSize];
@@ -464,4 +491,8 @@ public class InsertRowPlan extends InsertPlan {
     failedValues = null;
     return this;
   }
+
+  boolean hasFailedValues() {
+    return failedValues != null && !failedValues.isEmpty();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
new file mode 100644
index 0000000..0bc6592
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.physical.crud;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+
+public class InsertRowsOfOneDevicePlan extends InsertPlan {
+
+  private InsertRowPlan[] rowPlans;
+
+  public InsertRowsOfOneDevicePlan(PartialPath deviceId, Long[] insertTimes,
+      List<List<String>> measurements,
+      ByteBuffer[] insertValues) throws QueryProcessException {
+    super(OperatorType.BATCH_INSERT_ONE_DEVICE);
+    this.deviceId = deviceId;
+    rowPlans = new InsertRowPlan[insertTimes.length];
+    for (int i = 0; i < insertTimes.length; i++) {
+      rowPlans[i] = new InsertRowPlan(deviceId, insertTimes[i], measurements.get(i).toArray(new String[0]), insertValues[i]);
+      if (rowPlans[i].getMeasurements().length == 0) {
+        throw new QueryProcessException(
+            "The measurements are null, deviceId:" + deviceId
+                + ", time:" + insertTimes[i]);
+      }
+      if (rowPlans[i].getValues().length == 0) {
+        throw new QueryProcessException(
+            "The size of values in InsertRowsOfOneDevicePlan is 0, deviceId:" + deviceId
+                + ", time:" + insertTimes[i]);
+      }
+    }
+  }
+
+  //TODO do we need to rewrite hashCode?
+
+  @Override
+  public List<PartialPath> getPaths() {
+    Set<PartialPath> paths = new HashSet<>();
+    for (InsertRowPlan plan : rowPlans) {
+      paths.addAll(plan.getPaths());
+    }
+    return new ArrayList<>(paths);
+  }
+
+  @Override
+  public long getMinTime() {
+    long minTime = Long.MAX_VALUE;
+    for (InsertRowPlan plan : rowPlans) {
+      if (minTime > plan.getTime()) {
+        minTime = plan.getTime();
+      }
+    }
+    return minTime;
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    int type = PhysicalPlanType.BATCH_INSERT_ONE_DEVICE.ordinal();
+    stream.writeByte((byte) type);
+    putString(stream, deviceId.getFullPath());
+
+    stream.writeInt(rowPlans.length);
+    for (InsertRowPlan plan : rowPlans) {
+      stream.writeLong(plan.getTime());
+      plan.serializeMeasurementsAndValues(stream);
+    }
+  }
+
+
+  @Override
+  public void serialize(ByteBuffer buffer) {
+    int type = PhysicalPlanType.INSERT.ordinal();
+    buffer.put((byte) type);
+
+    putString(buffer, deviceId.getFullPath());
+
+    buffer.putInt(rowPlans.length);
+    for (InsertRowPlan plan : rowPlans) {
+      buffer.putLong(plan.getTime());
+      plan.serializeMeasurementsAndValues(buffer);
+    }
+  }
+
+  @Override
+  public void deserialize(ByteBuffer buffer) throws IllegalPathException {
+
+    this.deviceId = new PartialPath(readString(buffer));
+    this.rowPlans = new InsertRowPlan[buffer.getInt()];
+    for (int i = 0; i < rowPlans.length; i ++) {
+      rowPlans[i] = new InsertRowPlan();
+      rowPlans[i].setDeviceId(deviceId);
+      rowPlans[i].setTime(buffer.getLong());
+      rowPlans[i].deserializeMeasurementsAndValues(buffer);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "deviceId: " + deviceId + ", times: " + rowPlans.length ;
+  }
+
+
+  @Override
+  public InsertPlan getPlanFromFailed() {
+    if (super.getPlanFromFailed() == null) {
+      return null;
+    }
+    List<InsertRowPlan> plans = new ArrayList<>();
+    for (InsertRowPlan plan : rowPlans) {
+      if (plan.hasFailedValues()) {
+        plans.add((InsertRowPlan) plan.getPlanFromFailed());
+      }
+    }
+    this.rowPlans = plans.toArray(new InsertRowPlan[0]);
+    return this;
+  }
+
+
+  public InsertRowPlan[] getRowPlans() {
+    return rowPlans;
+  }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 76f4f59..7f2bf8d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -73,6 +73,7 @@ import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
@@ -114,6 +115,7 @@ import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
 import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
 import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
 import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSInsertOneDeviceRecordsReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
@@ -1256,14 +1258,47 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
     for (int i = 0; i < req.deviceIds.size(); i++) {
       try {
-        InsertRowPlan plan = new InsertRowPlan();
-        plan.setDeviceId(new PartialPath(req.getDeviceIds().get(i)));
-        plan.setTime(req.getTimestamps().get(i));
-        plan.setMeasurements(req.getMeasurementsList().get(i).toArray(new String[0]));
-        plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
-        plan.setValues(new Object[plan.getMeasurements().length]);
-        plan.fillValues(req.valuesList.get(i));
-        plan.setNeedInferType(false);
+        InsertRowPlan plan = new InsertRowPlan(
+            new PartialPath(req.getDeviceIds().get(i)), req.getTimestamps().get(i),
+            req.getMeasurementsList().get(i).toArray(new String[0]), req.valuesList.get(i)
+        );
+        TSStatus status = checkAuthority(plan, req.getSessionId());
+        if (status != null) {
+          statusList.add(status);
+        } else {
+          statusList.add(executeNonQueryPlan(plan));
+        }
+      } catch (Exception e) {
+        logger.error("meet error when insert in batch", e);
+        statusList.add(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+      }
+    }
+
+    return RpcUtils.getStatus(statusList);
+  }
+
+  @Override
+  public TSStatus insertOneDeviceRecords(TSInsertOneDeviceRecordsReq req) throws TException {
+    if (auditLogger.isDebugEnabled()) {
+      auditLogger
+          .debug("Session {} insertRecords, device {}, first time {}", currSessionId.get(),
+              req.deviceId, req.getTimestamps().get(0));
+    }
+    if (!checkLogin(req.getSessionId())) {
+      logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
+      return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
+    }
+
+    List<TSStatus> statusList = new ArrayList<>();
+
+    for (int i = 0; i < req.measurementsList.size(); i++) {
+      try {
+        InsertRowsOfOneDevicePlan plan = new InsertRowsOfOneDevicePlan(
+            new PartialPath(req.getDeviceId()),
+            req.getTimestamps().toArray(new Long[0]),
+            req.getMeasurementsList(),
+            req.getValuesList().toArray(new ByteBuffer[0])
+        );
         TSStatus status = checkAuthority(plan, req.getSessionId());
         if (status != null) {
           statusList.add(status);
@@ -1348,6 +1383,12 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   }
 
   @Override
+  public TSStatus testInsertOneDeviceRecords(TSInsertOneDeviceRecordsReq req) throws TException {
+    logger.debug("Test insert rows in batch request receive.");
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  @Override
   public TSStatus testInsertStringRecords(TSInsertStringRecordsReq req) throws TException {
     logger.debug("Test insert string records request receive.");
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
@@ -1364,14 +1405,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
       }
 
-      InsertRowPlan plan = new InsertRowPlan();
-      plan.setDeviceId(new PartialPath(req.getDeviceId()));
-      plan.setTime(req.getTimestamp());
-      plan.setMeasurements(req.getMeasurements().toArray(new String[0]));
-      plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
-      plan.setValues(new Object[plan.getMeasurements().length]);
-      plan.fillValues(req.values);
-      plan.setNeedInferType(false);
+      InsertRowPlan plan = new InsertRowPlan(
+          new PartialPath(req.getDeviceId()), req.getTimestamp(),
+          req.getMeasurements().toArray(new String[0]), req.values
+      );
 
       TSStatus status = checkAuthority(plan, req.getSessionId());
       if (status != null) {
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlanTest.java
new file mode 100644
index 0000000..f7aed72
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlanTest.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.physical.crud;
+
+public class InsertRowsOfOneDevicePlanTest {
+
+
+}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index d4abb13..8e59b6f 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -36,6 +37,7 @@ import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
 import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSInsertOneDeviceRecordsReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
@@ -379,6 +381,117 @@ public class Session {
     }
   }
 
+  /**
+   * Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
+   * executeBatch, we pack some insert request in batch and send them to server. If you want improve
+   * your performance, please see insertTablet method
+   * <p>
+   * Each row is independent, which could have different deviceId, time, number of measurements
+   *
+   * @see Session#insertTablet(Tablet)
+   */
+  public void insertOneDeviceRecords(String deviceId, List<Long> times,
+      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException {
+     insertOneDeviceRecords(deviceId, times, measurementsList, typesList, valuesList, false);
+  }
+  /**
+   * Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
+   * executeBatch, we pack some insert request in batch and send them to server. If you want improve
+   * your performance, please see insertTablet method
+   * <p>
+   * Each row is independent, which could have different deviceId, time, number of measurements
+   *
+   * @param haveSorted  whether the times have been sorted
+   * @see Session#insertTablet(Tablet)
+   */
+  public void insertOneDeviceRecords(String deviceId, List<Long> times,
+      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList, boolean haveSorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    TSInsertOneDeviceRecordsReq request = genTSInsertOneDeviceRecordsReq(deviceId, times, measurementsList,
+        typesList, valuesList, haveSorted);
+    try {
+      RpcUtils.verifySuccess(client.insertOneDeviceRecords(request));
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.insertOneDeviceRecords(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(
+            "Fail to reconnect to server. Please check server status");
+      }
+    }
+  }
+
+  private TSInsertOneDeviceRecordsReq genTSInsertOneDeviceRecordsReq(String deviceId, List<Long> times,
+      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList, boolean haveSorted) throws IoTDBConnectionException, BatchExecutionException {
+    // check params size
+    int len = times.size();
+    if (len != measurementsList.size() || len != valuesList.size()) {
+      throw new IllegalArgumentException(
+          "times, measurementsList and valuesList's size should be equal");
+    }
+
+    if (haveSorted) {
+      if (!checkSorted(times)) {
+        throw new BatchExecutionException("Times in InsertOneDeviceRecords are not in ascending order");
+      }
+    } else {
+      sortRecordsInOneDevice(times, measurementsList, typesList, valuesList);
+    }
+
+    TSInsertOneDeviceRecordsReq request = new TSInsertOneDeviceRecordsReq();
+    request.setSessionId(sessionId);
+    request.setDeviceId(deviceId);
+    request.setTimestamps(times);
+    request.setMeasurementsList(measurementsList);
+    List<ByteBuffer> buffersList = objectValuesListToByteBufferList(valuesList, typesList);
+    request.setValuesList(buffersList);
+    return request;
+  }
+
+  private void sortRecordsInOneDevice(List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList) {
+    Integer[] index = new Integer[times.size()];
+    for (int i = 0; i < times.size(); i++) {
+      index[i] = i;
+    }
+    Arrays.sort(index, Comparator.comparingLong(times::get));
+    times.sort(Long::compareTo);
+    //sort measurementList
+    sortList(measurementsList, index);
+    //sort typesList
+    sortList(typesList, index);
+    //sort values
+    sortList(valuesList, index);
+  }
+
+  private void sortList(List source, Integer[] index) {
+    List tmpLists = new ArrayList<>(source);
+
+    for (int i = 0; i < index.length; i++) {
+      source.set(i, tmpLists.get(index[i]));
+    }
+  }
+
+  private List<ByteBuffer> objectValuesListToByteBufferList(List<List<Object>> valuesList, List<List<TSDataType>> typesList)
+      throws IoTDBConnectionException {
+    List<ByteBuffer> buffersList = new ArrayList<>();
+    for (int i = 0; i < valuesList.size(); i++) {
+      ByteBuffer buffer = ByteBuffer.allocate(calculateLength(typesList.get(i), valuesList.get(i)));
+      putValues(typesList.get(i), valuesList.get(i), buffer);
+      buffer.flip();
+      buffersList.add(buffer);
+    }
+    return buffersList;
+  }
+
+
   private TSInsertRecordsReq genTSInsertRecordsReq(List<String> deviceIds, List<Long> times,
       List<List<String>> measurementsList, List<List<TSDataType>> typesList,
       List<List<Object>> valuesList) throws IoTDBConnectionException {
@@ -394,13 +507,7 @@ public class Session {
     request.setDeviceIds(deviceIds);
     request.setTimestamps(times);
     request.setMeasurementsList(measurementsList);
-    List<ByteBuffer> buffersList = new ArrayList<>();
-    for (int i = 0; i < measurementsList.size(); i++) {
-      ByteBuffer buffer = ByteBuffer.allocate(calculateLength(typesList.get(i), valuesList.get(i)));
-      putValues(typesList.get(i), valuesList.get(i), buffer);
-      buffer.flip();
-      buffersList.add(buffer);
-    }
+    List<ByteBuffer> buffersList = objectValuesListToByteBufferList(valuesList, typesList);
     request.setValuesList(buffersList);
     return request;
   }
@@ -1064,6 +1171,15 @@ public class Session {
     return true;
   }
 
+  private boolean checkSorted(List<Long> times) {
+    for (int i = 1; i < times.size(); i++) {
+      if (times.get(i) < times.get(i - 1)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   public void sortTablet(Tablet tablet) {
     /*
      * following part of code sort the batch data by time,
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 0f8bb56..72ac8d1 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -415,6 +415,63 @@ public class SessionPool {
 
 
   /**
+   * Insert data that belong to the same device in batch format, which can reduce the overhead of
+   * network. This method is just like jdbc batch insert, we pack some insert request in batch and
+   * send them to server If you want
+   * improve your performance, please see insertTablet method
+   *
+   * @see Session#insertTablet(Tablet)
+   */
+  public void insertOneDeviceRecords(String deviceId, List<Long> times,
+      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList) throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.insertOneDeviceRecords(deviceId, times, measurementsList, typesList, valuesList, false);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        logger.warn("insertOneDeviceRecords failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * Insert data that belong to the same device in batch format, which can reduce the overhead of
+   * network. This method is just like jdbc batch insert, we pack some insert request in batch and
+   * send them to server If you want
+   * improve your performance, please see insertTablet method
+   * @param  haveSorted whether the times list has been ordered.
+   * @see Session#insertTablet(Tablet)
+   */
+  public void insertOneDeviceRecords(String deviceId, List<Long> times,
+      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList, boolean haveSorted) throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.insertOneDeviceRecords(deviceId, times, measurementsList, typesList, valuesList, haveSorted);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        logger.warn("insertOneDeviceRecords failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+
+  /**
    * Insert data in batch format, which can reduce the overhead of network. This method is just like
    * jdbc batch insert, we pack some insert request in batch and send them to server If you want
    * improve your performance, please see insertTablet method
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java
index ad59aea..a0b6123 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java
@@ -779,4 +779,5 @@ public class IoTDBSessionComplexIT {
       Assert.assertEquals(700, count);
     }
   }
+
 }
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java
index 09849c8..f220234 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java
@@ -388,4 +388,66 @@ public class IoTDBSessionSimpleIT {
     session.deleteStorageGroup(storageGroup);
     session.close();
   }
+
+  @Test
+  public void testInsertOneDeviceRecords()
+      throws IoTDBConnectionException, StatementExecutionException {
+    session = new Session("127.0.0.1", 6667, "root", "root");
+    session.open();
+    List<Long> times = new ArrayList<>();
+    List<List<String>> measurements = new ArrayList<>();
+    List<List<TSDataType>> datatypes = new ArrayList<>();
+    List<List<Object>> values = new ArrayList<>();
+
+    List<String> tmpMeasurements = null;
+    List<TSDataType> tmpDataTypes = null;
+    List<Object> tmpValues = null;
+
+    times.add(1L);
+    tmpMeasurements = new ArrayList<>();
+    tmpDataTypes = new ArrayList<>();
+    tmpValues = new ArrayList<>();
+    tmpMeasurements.add("s1");
+    tmpMeasurements.add("s2");
+    tmpDataTypes.add(TSDataType.INT32);
+    tmpDataTypes.add(TSDataType.INT32);
+    tmpValues.add(1);
+    tmpValues.add(2);
+    measurements.add(tmpMeasurements);
+    datatypes.add(tmpDataTypes);
+    values.add(tmpValues);
+
+
+    times.add(2L);
+    tmpMeasurements = new ArrayList<>();
+    tmpDataTypes = new ArrayList<>();
+    tmpValues = new ArrayList<>();
+    tmpMeasurements.add("s2");
+    tmpMeasurements.add("s3");
+    tmpDataTypes.add(TSDataType.INT32);
+    tmpDataTypes.add(TSDataType.INT64);
+    tmpValues.add(3);
+    tmpValues.add(4L);
+    measurements.add(tmpMeasurements);
+    datatypes.add(tmpDataTypes);
+    values.add(tmpValues);
+
+    times.add(3L);
+    tmpMeasurements = new ArrayList<>();
+    tmpDataTypes = new ArrayList<>();
+    tmpValues = new ArrayList<>();
+    tmpMeasurements.add("s4");
+    tmpMeasurements.add("s5");
+    tmpDataTypes.add(TSDataType.FLOAT);
+    tmpDataTypes.add(TSDataType.BOOLEAN);
+    tmpValues.add(5.0f);
+    tmpValues.add(Boolean.TRUE);
+    measurements.add(tmpMeasurements);
+    datatypes.add(tmpDataTypes);
+    values.add(tmpValues);
+
+    session.insertOneDeviceRecords("root.sg.d1", times, measurements, datatypes, values);
+    session.close();
+
+  }
 }
diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift
index 4f412bf..45eda3d 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -225,6 +225,14 @@ struct TSInsertRecordsReq {
     5: required list<i64> timestamps
 }
 
+struct TSInsertOneDeviceRecordsReq {
+    1: required i64 sessionId
+    2: required string deviceId
+    3: required list<list<string>> measurementsList
+    4: required list<binary> valuesList
+    5: required list<i64> timestamps
+}
+
 struct TSInsertStringRecordsReq {
     1: required i64 sessionId
     2: required list<string> deviceIds
@@ -326,6 +334,8 @@ service TSIService {
 
 	TSStatus insertRecords(1:TSInsertRecordsReq req);
 
+	TSStatus insertOneDeviceRecords(1:TSInsertOneDeviceRecordsReq req);
+
 	TSStatus insertStringRecords(1:TSInsertStringRecordsReq req);
 
 	TSStatus testInsertTablet(1:TSInsertTabletReq req);
@@ -338,6 +348,8 @@ service TSIService {
 
   TSStatus testInsertRecords(1:TSInsertRecordsReq req);
 
+  TSStatus testInsertOneDeviceRecords(1:TSInsertOneDeviceRecordsReq req);
+
   TSStatus testInsertStringRecords(1:TSInsertStringRecordsReq req);
 
 	TSStatus deleteData(1:TSDeleteDataReq req);