You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yu...@apache.org on 2020/12/31 04:30:04 UTC

[iotdb] 01/01: one device api

This is an automated email from the ASF dual-hosted git repository.

yuyuankang pushed a commit to branch session_one_device_api
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6fa4f61de545f36bbfd7161012b50874e3c21b43
Author: Ring-k <yu...@hotmail.com>
AuthorDate: Thu Dec 31 12:29:11 2020 +0800

    one device api
---
 docs/UserGuide/Client/Programming - Native API.md  |  18 +++
 .../UserGuide/Client/Programming - Native API.md   |  14 ++
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  13 ++
 .../engine/storagegroup/StorageGroupProcessor.java |  44 ++++++
 .../apache/iotdb/db/qp/executor/IPlanExecutor.java |   8 ++
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  59 +++++++++
 .../org/apache/iotdb/db/qp/logical/Operator.java   |   3 +-
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |   2 +-
 .../iotdb/db/qp/physical/crud/InsertRowPlan.java   |  25 ++++
 .../physical/crud/InsertRowsOfOneDevicePlan.java   | 147 +++++++++++++++++++++
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  69 +++++++---
 .../java/org/apache/iotdb/session/Session.java     | 125 +++++++++++++++++-
 .../apache/iotdb/session/SessionConnection.java    |  21 +++
 .../org/apache/iotdb/session/pool/SessionPool.java |  55 ++++++++
 .../apache/iotdb/session/IoTDBSessionSimpleIT.java | 130 ++++++++++++++++++
 15 files changed, 707 insertions(+), 26 deletions(-)

diff --git a/docs/UserGuide/Client/Programming - Native API.md b/docs/UserGuide/Client/Programming - Native API.md
index a1c6791..1f8358f 100644
--- a/docs/UserGuide/Client/Programming - Native API.md	
+++ b/docs/UserGuide/Client/Programming - Native API.md	
@@ -150,6 +150,14 @@ Here we show the commonly used interfaces and their parameters in the Native API
         List<List<String>> measurementsList, List<List<TSDataType>> typesList,
         List<List<Object>> valuesList)
   ```
+* Insert multiple Records that belong to the same device. 
+  With type info the server has no need to do type inference, which leads a better performance
+  
+  ```
+  void insertRecordsOfOneDevice(String deviceId, List<Long> times,
+        List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+        List<List<Object>> valuesList)
+  ```
   
 * Raw data query. Time interval include startTime and exclude endTime
 
@@ -184,6 +192,16 @@ Here we show the commonly used interfaces and their parameters in the Native API
         List<List<Object>> valuesList)
   ```
 
+* Test the network and client cost of insertRecordsOfOneDevice. 
+This method NOT insert data into database and server just return after accept the request, 
+this method should be used to test other time cost in client
+
+  ```
+  void testInsertRecordsOfOneDevice(String deviceId, List<Long> times,
+        List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+        List<List<Object>> valuesList)
+  ```  
+
 * Test the network and client cost of insertRecord. This method NOT insert data into database and server just return after accept the request, this method should be used to test other time cost in client
 
   ```
diff --git a/docs/zh/UserGuide/Client/Programming - Native API.md b/docs/zh/UserGuide/Client/Programming - Native API.md
index affd9cd..560f222 100644
--- a/docs/zh/UserGuide/Client/Programming - Native API.md	
+++ b/docs/zh/UserGuide/Client/Programming - Native API.md	
@@ -146,7 +146,14 @@
         List<List<String>> measurementsList, List<List<TSDataType>> typesList,
         List<List<Object>> valuesList)
   ```
+  
+* 插入同属于一个device的多个 Record。
 
+  ```
+  void insertRecordsOfOneDevice(String deviceId, List<Long> times,
+        List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+        List<List<Object>> valuesList)
+  ```
 
 * 原始数据查询。时间间隔包含开始时间,不包含结束时间
 
@@ -181,6 +188,13 @@
           List<List<Object>> valuesList)
     ```
 
+* 测试 testInsertRecordsOfOneDevice,不实际写入数据,只将数据传输到 server 即返回。
+
+    ```
+    void testInsertRecordsOfOneDevice(List<String> deviceIds, List<Long> times,
+          List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+          List<List<Object>> valuesList)
+    ```
 
 * 测试 insertRecord,不实际写入数据,只将数据传输到 server 即返回。
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 781836f..388ff50 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -71,6 +71,7 @@ import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
 import org.apache.iotdb.db.monitor.StatMonitor;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryFileManager;
@@ -404,6 +405,18 @@ public class StorageEngine implements IService {
     }
   }
 
+  public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
+      throws StorageEngineException {
+    StorageGroupProcessor storageGroupProcessor = getProcessor(insertRowsOfOneDevicePlan.getDeviceId());
+
+    // TODO monitor: update statistics
+    try {
+      storageGroupProcessor.insert(insertRowsOfOneDevicePlan);
+    } catch (WriteProcessException e) {
+      throw new StorageEngineException(e);
+    }
+  }
+
   /**
    * insert a InsertTabletPlan to a storage group
    */
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index a2d6a5f..a9fae6a 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -76,6 +76,7 @@ import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryFileManager;
@@ -2503,6 +2504,49 @@ public class StorageGroupProcessor {
     return tsFileManagement;
   }
 
+  public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
+      throws WriteProcessException {
+
+    if (enableMemControl) {
+      StorageEngine.blockInsertionIfReject();
+    }
+    writeLock();
+    try {
+      boolean isSequence = false;
+      for (InsertRowPlan plan : insertRowsOfOneDevicePlan.getRowPlans()) {
+        if (!isAlive(plan.getTime())) {
+          //we do not need to write these part of data, as they can not be queried
+          continue;
+        }
+        // init map
+        long timePartitionId = StorageEngine.getTimePartition(plan.getTime());
+
+        partitionLatestFlushedTimeForEachDevice
+            .computeIfAbsent(timePartitionId, id -> new HashMap<>());
+        //as the plans have been ordered, and we have get the write lock,
+        //So, if a plan is sequenced, then all the rest plans are sequenced.
+        //
+        if (!isSequence) {
+          isSequence =
+              plan.getTime() > partitionLatestFlushedTimeForEachDevice.get(timePartitionId)
+                  .getOrDefault(plan.getDeviceId().getFullPath(), Long.MIN_VALUE);
+        }
+        //is unsequence and user set config to discard out of order data
+        if (!isSequence && IoTDBDescriptor.getInstance().getConfig()
+            .isEnableDiscardOutOfOrderData()) {
+          return;
+        }
+
+        latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>());
+        // insert to sequence or unSequence file
+        insertToTsFileProcessor(plan, isSequence, plan.getTime());
+      }
+    } finally {
+      writeUnlock();
+    }
+
+  }
+
   private enum LoadTsFileType {
     LOAD_SEQUENCE, LOAD_UNSEQUENCE
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
index 50fe869..b2b68a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
@@ -92,6 +93,13 @@ public interface IPlanExecutor {
   void insert(InsertRowPlan insertRowPlan) throws QueryProcessException;
 
   /**
+   * execute insert command and return whether the operator is successful.
+   *
+   * @param insertRowsOfOneDevicePlan physical insert plan
+   */
+  void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan) throws QueryProcessException;
+
+  /**
    * execute batch insert plan
    *
    * @throws BatchProcessException when some of the rows failed
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 7130bdb..a8decd4 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -104,6 +104,7 @@ import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
@@ -210,6 +211,9 @@ public class PlanExecutor implements IPlanExecutor {
       case INSERT:
         insert((InsertRowPlan) plan);
         return true;
+      case BATCH_INSERT_ONE_DEVICE:
+        insert((InsertRowsOfOneDevicePlan) plan);
+        return true;
       case BATCHINSERT:
         insertTablet((InsertTabletPlan) plan);
         return true;
@@ -993,10 +997,65 @@ public class PlanExecutor implements IPlanExecutor {
   }
 
   @Override
+  public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
+      throws QueryProcessException {
+    try {
+      for (InsertRowPlan plan : insertRowsOfOneDevicePlan.getRowPlans()) {
+        plan.setMeasurementMNodes(new MeasurementMNode[plan.getMeasurements().length]);
+        // check whether types are match
+        getSeriesSchemas(plan);
+        //we do not need to infer data type for insertRowsOfOneDevicePlan
+      }
+      //ok, we can begin to write data into the engine..
+      StorageEngine.getInstance().insert(insertRowsOfOneDevicePlan);
+
+      List<String> notExistedPaths = null;
+      List<String> failedMeasurements = null;
+      for (InsertRowPlan plan : insertRowsOfOneDevicePlan.getRowPlans()) {
+        if (plan.getFailedMeasurements() != null) {
+          if (notExistedPaths == null) {
+            notExistedPaths = new ArrayList<>();
+            failedMeasurements = new ArrayList<>();
+          }
+          // check if all path not exist exceptions
+          List<String> failedPaths = plan.getFailedMeasurements();
+          List<Exception> exceptions = plan.getFailedExceptions();
+          boolean isPathNotExistException = true;
+          for (Exception e : exceptions) {
+            Throwable curException = e;
+            while (curException.getCause() != null) {
+              curException = curException.getCause();
+            }
+            if (!(curException instanceof PathNotExistException)) {
+              isPathNotExistException = false;
+              break;
+            }
+          }
+          if (isPathNotExistException) {
+            notExistedPaths.addAll(failedPaths);
+          } else {
+            failedMeasurements.addAll(plan.getFailedMeasurements());
+          }
+        }
+      }
+      if (notExistedPaths != null && !notExistedPaths.isEmpty()) {
+        throw new PathNotExistException(notExistedPaths);
+      } else if (notExistedPaths != null && !failedMeasurements.isEmpty()) {
+        throw new StorageEngineException(
+            "failed to insert points " + failedMeasurements);
+      }
+
+    } catch (StorageEngineException | MetadataException e) {
+      throw new QueryProcessException(e);
+    }
+  }
+
+  @Override
   public void insert(InsertRowPlan insertRowPlan) throws QueryProcessException {
     try {
       insertRowPlan
           .setMeasurementMNodes(new MeasurementMNode[insertRowPlan.getMeasurements().length]);
+      // check whether types are match
       getSeriesSchemas(insertRowPlan);
       insertRowPlan.transferType();
       StorageEngine.getInstance().insert(insertRowPlan);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index d713fc8..fad59aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -80,6 +80,7 @@ public abstract class Operator {
     UDAF, UDTF, CREATE_FUNCTION, DROP_FUNCTION,
     CREATE_MULTI_TIMESERIES, CREATE_INDEX, DROP_INDEX, QUERY_INDEX,
     CHANGE_TAG_OFFSET, CHANGE_ALIAS, MNODE,
-    MEASUREMENT_MNODE, STORAGE_GROUP_MNODE;
+    MEASUREMENT_MNODE, STORAGE_GROUP_MNODE,
+    BATCH_INSERT_ONE_DEVICE;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index d40f1eb..33df785 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -361,7 +361,7 @@ public abstract class PhysicalPlan {
     REVOKE_USER_PRIVILEGE, GRANT_ROLE_PRIVILEGE, GRANT_USER_PRIVILEGE, GRANT_USER_ROLE, MODIFY_PASSWORD, DELETE_USER,
     DELETE_STORAGE_GROUP, SHOW_TIMESERIES, DELETE_TIMESERIES, LOAD_CONFIGURATION, CREATE_MULTI_TIMESERIES,
     ALTER_TIMESERIES, FLUSH, CREATE_INDEX, DROP_INDEX,
-    CHANGE_TAG_OFFSET, CHANGE_ALIAS, MNODE, MEASUREMENT_MNODE, STORAGE_GROUP_MNODE
+    CHANGE_TAG_OFFSET, CHANGE_ALIAS, MNODE, MEASUREMENT_MNODE, STORAGE_GROUP_MNODE,BATCH_INSERT_ONE_DEVICE
   }
 
   public long getIndex() {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index c3c7a7d..ec32f5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -90,6 +90,18 @@ public class InsertRowPlan extends InsertPlan {
     isNeedInferType = true;
   }
 
+  public InsertRowPlan(PartialPath deviceId, long insertTime, String[] measurementList,
+      ByteBuffer  values) throws QueryProcessException {
+    super(Operator.OperatorType.INSERT);
+    this.time = insertTime;
+    this.deviceId = deviceId;
+    this.measurements = measurementList;
+    this.dataTypes = new TSDataType[measurementList.length];
+    this.values = new Object[measurementList.length];
+    this.fillValues(values);
+    isNeedInferType = false;
+  }
+
   @TestOnly
   public InsertRowPlan(PartialPath deviceId, long insertTime, String[] measurements,
       TSDataType[] dataTypes, String[] insertValues) {
@@ -256,7 +268,10 @@ public class InsertRowPlan extends InsertPlan {
     stream.writeLong(time);
 
     putString(stream, deviceId.getFullPath());
+    serializeMeasurementsAndValues(stream);
+  }
 
+  void serializeMeasurementsAndValues(DataOutputStream stream) throws IOException {
     stream.writeInt(
         measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
 
@@ -400,7 +415,10 @@ public class InsertRowPlan extends InsertPlan {
     buffer.putLong(time);
 
     putString(buffer, deviceId.getFullPath());
+    serializeMeasurementsAndValues(buffer);
+  }
 
+  void serializeMeasurementsAndValues(ByteBuffer buffer) {
     buffer
         .putInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
 
@@ -425,7 +443,10 @@ public class InsertRowPlan extends InsertPlan {
   public void deserialize(ByteBuffer buffer) throws IllegalPathException {
     this.time = buffer.getLong();
     this.deviceId = new PartialPath(readString(buffer));
+    deserializeMeasurementsAndValues(buffer);
+  }
 
+  void deserializeMeasurementsAndValues(ByteBuffer buffer) {
     int measurementSize = buffer.getInt();
 
     this.measurements = new String[measurementSize];
@@ -451,6 +472,10 @@ public class InsertRowPlan extends InsertPlan {
         .toString(measurements) + ", values: " + Arrays.toString(values);
   }
 
+  boolean hasFailedValues() {
+    return failedValues != null && !failedValues.isEmpty();
+  }
+
   public TimeValuePair composeTimeValuePair(int measurementIndex) {
     if (measurementIndex >= values.length) {
       return null;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
new file mode 100644
index 0000000..9dd5806
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.physical.crud;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+
+public class InsertRowsOfOneDevicePlan extends InsertPlan {
+
+  private InsertRowPlan[] rowPlans;
+
+  public InsertRowsOfOneDevicePlan(PartialPath deviceId, Long[] insertTimes,
+      List<List<String>> measurements,
+      ByteBuffer[] insertValues) throws QueryProcessException {
+    super(OperatorType.BATCH_INSERT_ONE_DEVICE);
+    this.deviceId = deviceId;
+    rowPlans = new InsertRowPlan[insertTimes.length];
+    for (int i = 0; i < insertTimes.length; i++) {
+      rowPlans[i] = new InsertRowPlan(deviceId, insertTimes[i], measurements.get(i).toArray(new String[0]), insertValues[i]);
+      if (rowPlans[i].getMeasurements().length == 0) {
+        throw new QueryProcessException(
+            "The measurements are null, deviceId:" + deviceId
+                + ", time:" + insertTimes[i]);
+      }
+      if (rowPlans[i].getValues().length == 0) {
+        throw new QueryProcessException(
+            "The size of values in InsertRowsOfOneDevicePlan is 0, deviceId:" + deviceId
+                + ", time:" + insertTimes[i]);
+      }
+    }
+  }
+
+  //TODO I see InsertRowPlan rewrites the hashCode, but do we need to rewrite hashCode?
+
+  @Override
+  public List<PartialPath> getPaths() {
+    Set<PartialPath> paths = new HashSet<>();
+    for (InsertRowPlan plan : rowPlans) {
+      paths.addAll(plan.getPaths());
+    }
+    return new ArrayList<>(paths);
+  }
+
+  @Override
+  public long getMinTime() {
+    long minTime = Long.MAX_VALUE;
+    for (InsertRowPlan plan : rowPlans) {
+      if (minTime > plan.getTime()) {
+        minTime = plan.getTime();
+      }
+    }
+    return minTime;
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    int type = PhysicalPlanType.BATCH_INSERT_ONE_DEVICE.ordinal();
+    stream.writeByte((byte) type);
+    putString(stream, deviceId.getFullPath());
+
+    stream.writeInt(rowPlans.length);
+    for (InsertRowPlan plan : rowPlans) {
+      stream.writeLong(plan.getTime());
+      plan.serializeMeasurementsAndValues(stream);
+    }
+  }
+
+
+  @Override
+  public void serialize(ByteBuffer buffer) {
+    int type = PhysicalPlanType.INSERT.ordinal();
+    buffer.put((byte) type);
+
+    putString(buffer, deviceId.getFullPath());
+
+    buffer.putInt(rowPlans.length);
+    for (InsertRowPlan plan : rowPlans) {
+      buffer.putLong(plan.getTime());
+      plan.serializeMeasurementsAndValues(buffer);
+    }
+  }
+
+  @Override
+  public void deserialize(ByteBuffer buffer) throws IllegalPathException {
+
+    this.deviceId = new PartialPath(readString(buffer));
+    this.rowPlans = new InsertRowPlan[buffer.getInt()];
+    for (int i = 0; i < rowPlans.length; i ++) {
+      rowPlans[i] = new InsertRowPlan();
+      rowPlans[i].setDeviceId(deviceId);
+      rowPlans[i].setTime(buffer.getLong());
+      rowPlans[i].deserializeMeasurementsAndValues(buffer);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "deviceId: " + deviceId + ", times: " + rowPlans.length ;
+  }
+
+
+  @Override
+  public InsertPlan getPlanFromFailed() {
+    if (super.getPlanFromFailed() == null) {
+      return null;
+    }
+    List<InsertRowPlan> plans = new ArrayList<>();
+    for (InsertRowPlan plan : rowPlans) {
+      if (plan.hasFailedValues()) {
+        plans.add((InsertRowPlan) plan.getPlanFromFailed());
+      }
+    }
+    this.rowPlans = plans.toArray(new InsertRowPlan[0]);
+    return this;
+  }
+
+
+  public InsertRowPlan[] getRowPlans() {
+    return rowPlans;
+  }
+
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index e423b14..2144f5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -73,6 +73,7 @@ import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
@@ -119,6 +120,7 @@ import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
 import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
 import org.apache.iotdb.service.rpc.thrift.TSIService;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
@@ -1240,14 +1242,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     boolean isAllSuccessful = true;
     for (int i = 0; i < req.deviceIds.size(); i++) {
       try {
-        InsertRowPlan plan = new InsertRowPlan();
-        plan.setDeviceId(new PartialPath(req.getDeviceIds().get(i)));
-        plan.setTime(req.getTimestamps().get(i));
-        plan.setMeasurements(req.getMeasurementsList().get(i).toArray(new String[0]));
-        plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
-        plan.setValues(new Object[plan.getMeasurements().length]);
-        plan.fillValues(req.valuesList.get(i));
-        plan.setNeedInferType(false);
+        InsertRowPlan plan = new InsertRowPlan(
+            new PartialPath(req.getDeviceIds().get(i)), req.getTimestamps().get(i),
+            req.getMeasurementsList().get(i).toArray(new String[0]), req.valuesList.get(i)
+        );
         TSStatus status = checkAuthority(plan, req.getSessionId());
         if (status == null) {
           status = executeNonQueryPlan(plan);
@@ -1275,6 +1273,41 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   }
 
   @Override
+  public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) throws TException {
+    if (auditLogger.isDebugEnabled()) {
+      auditLogger
+          .debug("Session {} insertRecords, device {}, first time {}", currSessionId.get(),
+              req.deviceId, req.getTimestamps().get(0));
+    }
+    if (!checkLogin(req.getSessionId())) {
+      logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
+      return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
+    }
+
+    List<TSStatus> statusList = new ArrayList<>();
+
+    try {
+      InsertRowsOfOneDevicePlan plan = new InsertRowsOfOneDevicePlan(
+          new PartialPath(req.getDeviceId()),
+          req.getTimestamps().toArray(new Long[0]),
+          req.getMeasurementsList(),
+          req.getValuesList().toArray(new ByteBuffer[0])
+      );
+      TSStatus status = checkAuthority(plan, req.getSessionId());
+      if (status != null) {
+        statusList.add(status);
+      } else {
+        statusList.add(executeNonQueryPlan(plan));
+      }
+    } catch (Exception e) {
+      logger.error("meet error when insert in batch", e);
+      statusList.add(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+    }
+
+    return RpcUtils.getStatus(statusList);
+  }
+
+  @Override
   public TSStatus insertStringRecords(TSInsertStringRecordsReq req) throws TException {
     if (auditLogger.isDebugEnabled()) {
       auditLogger
@@ -1355,6 +1388,12 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   }
 
   @Override
+  public TSStatus testInsertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) throws TException {
+    logger.debug("Test insert rows in batch request receive.");
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  @Override
   public TSStatus testInsertStringRecords(TSInsertStringRecordsReq req) throws TException {
     logger.debug("Test insert string records request receive.");
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
@@ -1370,16 +1409,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
         return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
       }
-
-      InsertRowPlan plan = new InsertRowPlan();
-      plan.setDeviceId(new PartialPath(req.getDeviceId()));
-      plan.setTime(req.getTimestamp());
-      plan.setMeasurements(req.getMeasurements().toArray(new String[0]));
-      plan.setDataTypes(new TSDataType[plan.getMeasurements().length]);
-      plan.setValues(new Object[plan.getMeasurements().length]);
-      plan.fillValues(req.values);
-      plan.setNeedInferType(false);
-
+      InsertRowPlan plan = new InsertRowPlan(
+          new PartialPath(req.getDeviceId()), req.getTimestamp(),
+          req.getMeasurements().toArray(new String[0]), req.values
+      );
       TSStatus status = checkAuthority(plan, req.getSessionId());
       if (status != null) {
         return status;
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 472dbe3..18275cf 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -32,12 +32,14 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.iotdb.rpc.BatchExecutionException;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.RedirectException;
+import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
@@ -53,6 +55,7 @@ import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -558,6 +561,112 @@ public class Session {
     }
   }
 
+  /**
+   * Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
+   * executeBatch, we pack some insert request in batch and send them to server. If you want improve
+   * your performance, please see insertTablet method
+   * <p>
+   * Each row is independent, which could have different deviceId, time, number of measurements
+   *
+   * @see Session#insertTablet(Tablet)
+   */
+  public void insertRecordsOfOneDevice(String deviceId, List<Long> times,
+      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList, false);
+  }
+  /**
+   * Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
+   * executeBatch, we pack some insert request in batch and send them to server. If you want improve
+   * your performance, please see insertTablet method
+   * <p>
+   * Each row is independent, which could have different deviceId, time, number of measurements
+   *
+   * @param haveSorted  whether the times have been sorted
+   * @see Session#insertTablet(Tablet)
+   */
+  public void insertRecordsOfOneDevice(String deviceId, List<Long> times,
+      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList, boolean haveSorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    int len = times.size();
+    if (len != measurementsList.size() || len != valuesList.size()) {
+      throw new IllegalArgumentException(
+          "deviceIds, times, measurementsList and valuesList's size should be equal");
+    }
+    TSInsertRecordsOfOneDeviceReq request = genTSInsertRecordsOfOneDeviceReq(deviceId, times, measurementsList,
+        typesList, valuesList, haveSorted);
+    try {
+      getSessionConnection(deviceId).insertRecordsOfOneDevice(request);
+    } catch (RedirectException e) {
+      handleRedirection(deviceId, e.getEndPoint());
+    }
+  }
+
+  private TSInsertRecordsOfOneDeviceReq genTSInsertRecordsOfOneDeviceReq(String deviceId, List<Long> times,
+      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList, boolean haveSorted) throws IoTDBConnectionException, BatchExecutionException {
+    // check params size
+    int len = times.size();
+    if (len != measurementsList.size() || len != valuesList.size()) {
+      throw new IllegalArgumentException(
+          "times, measurementsList and valuesList's size should be equal");
+    }
+
+    if (haveSorted) {
+      if (!checkSorted(times)) {
+        throw new BatchExecutionException("Times in InsertOneDeviceRecords are not in ascending order");
+      }
+    } else {
+      //sort
+      Integer[] index = new Integer[times.size()];
+      Integer[] index2 = new Integer[times.size()];
+      for (int i = 0; i < times.size(); i++) {
+        index2[i] = index[i] = i;
+      }
+      Arrays.sort(index, Comparator.comparingLong(times::get));
+      Arrays.sort(index2, Comparator.comparingInt(x -> index[x]));
+      times.sort(Long::compareTo);
+      //sort measurementList
+      measurementsList = sortList(measurementsList, index2);
+      //sort typesList
+      typesList = sortList(typesList, index2);
+      //sort values
+      valuesList = sortList(valuesList, index2);
+    }
+
+    TSInsertRecordsOfOneDeviceReq request = new TSInsertRecordsOfOneDeviceReq();
+    request.setDeviceId(deviceId);
+    request.setTimestamps(times);
+    request.setMeasurementsList(measurementsList);
+    List<ByteBuffer> buffersList = objectValuesListToByteBufferList(valuesList, typesList);
+    request.setValuesList(buffersList);
+    return request;
+  }
+
+  @SuppressWarnings("squid:S3740")
+  private List sortList(List source, Integer[] index) {
+    Object[] result = new Object[source.size()];
+    for (int i = 0; i < index.length; i++) {
+      result[index[i]] = source.get(i);
+    }
+    return Arrays.asList(result);
+  }
+
+  private List<ByteBuffer> objectValuesListToByteBufferList(List<List<Object>> valuesList, List<List<TSDataType>> typesList)
+      throws IoTDBConnectionException {
+    List<ByteBuffer> buffersList = new ArrayList<>();
+    for (int i = 0; i < valuesList.size(); i++) {
+      ByteBuffer buffer = ByteBuffer.allocate(calculateLength(typesList.get(i), valuesList.get(i)));
+      putValues(typesList.get(i), valuesList.get(i), buffer);
+      buffer.flip();
+      buffersList.add(buffer);
+    }
+    return buffersList;
+  }
+
+
   private void insertRecordsWithLeaderCache(List<String> deviceIds, List<Long> times,
       List<List<String>> measurementsList, List<List<TSDataType>> typesList,
       List<List<Object>> valuesList)
@@ -593,12 +702,7 @@ public class Session {
     request.setDeviceIds(deviceIds);
     request.setTimestamps(times);
     request.setMeasurementsList(measurementsList);
-    List<ByteBuffer> buffersList = new ArrayList<>();
-    for (int i = 0; i < measurementsList.size(); i++) {
-      ByteBuffer buffer = ByteBuffer.allocate(calculateLength(typesList.get(i), valuesList.get(i)));
-      putValues(typesList.get(i), valuesList.get(i), buffer);
-      buffersList.add(buffer);
-    }
+    List<ByteBuffer> buffersList = objectValuesListToByteBufferList(valuesList, typesList);
     request.setValuesList(buffersList);
     return request;
   }
@@ -1019,6 +1123,15 @@ public class Session {
     return true;
   }
 
+  private boolean checkSorted(List<Long> times) {
+    for (int i = 1; i < times.size(); i++) {
+      if (times.get(i) < times.get(i - 1)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   private void checkSortedThrowable(Tablet tablet) throws BatchExecutionException {
     if (!checkSorted(tablet)) {
       throw new BatchExecutionException("Times in Tablet are not in ascending order");
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 4110ad2..ce85103 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
 import org.apache.iotdb.service.rpc.thrift.TSIService;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
@@ -412,6 +413,26 @@ public class SessionConnection {
     }
   }
 
+  protected void insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq request)
+      throws IoTDBConnectionException, StatementExecutionException, RedirectException {
+    request.setSessionId(sessionId);
+    try {
+      RpcUtils.verifySuccessWithRedirection(client.insertRecordsOfOneDevice(request));
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          request.setSessionId(sessionId);
+          RpcUtils.verifySuccess(client.insertRecordsOfOneDevice(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(
+            MSG_RECONNECTION_FAIL);
+      }
+    }
+  }
+
   protected void insertTablet(TSInsertTabletReq request)
       throws IoTDBConnectionException, StatementExecutionException, RedirectException {
     request.setSessionId(sessionId);
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 87d48eb..9e9e5fd 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -413,6 +413,61 @@ public class SessionPool {
     }
   }
 
+  /**
+   * Insert data that belong to the same device in batch format, which can reduce the overhead of
+   * network. This method is just like jdbc batch insert, we pack some insert request in batch and
+   * send them to server If you want
+   * improve your performance, please see insertTablet method
+   *
+   * @see Session#insertTablet(Tablet)
+   */
+  public void insertOneDeviceRecords(String deviceId, List<Long> times,
+      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList) throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList, false);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        logger.warn("insertRecordsOfOneDevice failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * Insert data that belong to the same device in batch format, which can reduce the overhead of
+   * network. This method is just like jdbc batch insert, we pack some insert request in batch and
+   * send them to server If you want
+   * improve your performance, please see insertTablet method
+   * @param  haveSorted whether the times list has been ordered.
+   * @see Session#insertTablet(Tablet)
+   */
+  public void insertOneDeviceRecords(String deviceId, List<Long> times,
+      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList, boolean haveSorted) throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList, haveSorted);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        logger.warn("insertRecordsOfOneDevice failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
 
   /**
    * Insert data in batch format, which can reduce the overhead of network. This method is just like
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java
index 1d631be..d0a070d 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionSimpleIT.java
@@ -390,4 +390,134 @@ public class IoTDBSessionSimpleIT {
     session.close();
   }
 
+  @Test
+  public void testInsertOneDeviceRecords()
+      throws IoTDBConnectionException, StatementExecutionException {
+    session = new Session("127.0.0.1", 6667, "root", "root");
+    session.open();
+    List<Long> times = new ArrayList<>();
+    List<List<String>> measurements = new ArrayList<>();
+    List<List<TSDataType>> datatypes = new ArrayList<>();
+    List<List<Object>> values = new ArrayList<>();
+
+    addLine(times, measurements, datatypes, values, 3L, "s1", "s2", TSDataType.INT32,
+        TSDataType.INT32, 1, 2);
+    addLine(times, measurements, datatypes, values, 2L, "s2", "s3", TSDataType.INT32,
+        TSDataType.INT64, 3, 4L);
+    addLine(times, measurements, datatypes, values, 1L, "s4", "s5", TSDataType.FLOAT,
+        TSDataType.BOOLEAN, 5.0f, Boolean.TRUE);
+    session.insertRecordsOfOneDevice("root.sg.d1", times, measurements, datatypes, values);
+    checkResult(session);
+    session.close();
+  }
+
+  @Test
+  public void testInsertOneDeviceRecordsWithOrder()
+      throws IoTDBConnectionException, StatementExecutionException {
+    session = new Session("127.0.0.1", 6667, "root", "root");
+    session.open();
+    List<Long> times = new ArrayList<>();
+    List<List<String>> measurements = new ArrayList<>();
+    List<List<TSDataType>> datatypes = new ArrayList<>();
+    List<List<Object>> values = new ArrayList<>();
+
+    addLine(times, measurements, datatypes, values, 1L, "s4", "s5", TSDataType.FLOAT,
+        TSDataType.BOOLEAN, 5.0f, Boolean.TRUE);
+    addLine(times, measurements, datatypes, values, 2L, "s2", "s3", TSDataType.INT32,
+        TSDataType.INT64, 3, 4L);
+    addLine(times, measurements, datatypes, values, 3L, "s1", "s2", TSDataType.INT32,
+        TSDataType.INT32, 1, 2);
+
+    session.insertRecordsOfOneDevice("root.sg.d1", times, measurements, datatypes, values, true);
+    checkResult(session);
+    session.close();
+  }
+
+  @Test(expected = BatchExecutionException.class)
+  public void testInsertOneDeviceRecordsWithIncorrectOrder()
+      throws IoTDBConnectionException, StatementExecutionException {
+    session = new Session("127.0.0.1", 6667, "root", "root");
+    session.open();
+    List<Long> times = new ArrayList<>();
+    List<List<String>> measurements = new ArrayList<>();
+    List<List<TSDataType>> datatypes = new ArrayList<>();
+    List<List<Object>> values = new ArrayList<>();
+
+    addLine(times, measurements, datatypes, values, 2L, "s2", "s3", TSDataType.INT32,
+        TSDataType.INT64, 3, 4L);
+    addLine(times, measurements, datatypes, values, 3L, "s1", "s2", TSDataType.INT32,
+        TSDataType.INT32, 1, 2);
+    addLine(times, measurements, datatypes, values, 1L, "s4", "s5", TSDataType.FLOAT,
+        TSDataType.BOOLEAN, 5.0f, Boolean.TRUE);
+
+    session.insertRecordsOfOneDevice("root.sg.d1", times, measurements, datatypes, values, true);
+    checkResult(session);
+    session.close();
+  }
+
+  private void checkResult(Session session)
+      throws StatementExecutionException, IoTDBConnectionException {
+    SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg.d1");
+    dataSet.getColumnNames();
+    Assert.assertArrayEquals(dataSet.getColumnNames().toArray(new String[0]), new String[] {"Time",
+        "root.sg.d1.s3", "root.sg.d1.s4", "root.sg.d1.s5", "root.sg.d1.s1", "root.sg.d1.s2"});
+    Assert.assertArrayEquals(dataSet.getColumnTypes().toArray(new TSDataType[0]), new TSDataType[] { TSDataType.INT64,
+        TSDataType.INT64, TSDataType.FLOAT, TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT32
+    });
+    long time = 1L;
+    //
+    Assert.assertTrue (dataSet.hasNext());
+    RowRecord record = dataSet.next();
+    Assert.assertEquals(time, record.getTimestamp());
+    time ++;
+    assertNulls(record, new int[] {0, 3, 4});
+    Assert.assertEquals(5.0f,  record.getFields().get(1).getFloatV(), 0.01);
+    Assert.assertEquals(Boolean.TRUE,  record.getFields().get(2).getBoolV());
+
+
+    Assert.assertTrue (dataSet.hasNext());
+    record = dataSet.next();
+    Assert.assertEquals(time, record.getTimestamp());
+    time ++;
+    assertNulls(record, new int[] {1, 2, 3});
+    Assert.assertEquals(4L,  record.getFields().get(0).getLongV());
+    Assert.assertEquals(3,  record.getFields().get(4).getIntV());
+
+    Assert.assertTrue (dataSet.hasNext());
+    record = dataSet.next();
+    Assert.assertEquals(time, record.getTimestamp());
+    time ++;
+    assertNulls(record, new int[] {0, 1, 2});
+    Assert.assertEquals(1,  record.getFields().get(3).getIntV());
+    Assert.assertEquals(2,  record.getFields().get(4).getIntV());
+
+    Assert.assertFalse (dataSet.hasNext());
+    dataSet.closeOperationHandle();
+
+  }
+
+  private void addLine(List<Long> times, List<List<String>> measurements, List<List<TSDataType>> datatypes,
+      List<List<Object>> values, long time, String s1, String s2, TSDataType s1type, TSDataType s2type,
+      Object value1, Object value2) {
+    List<String>  tmpMeasurements = new ArrayList<>();
+    List<TSDataType> tmpDataTypes = new ArrayList<>();
+    List<Object> tmpValues = new ArrayList<>();
+    tmpMeasurements.add(s1);
+    tmpMeasurements.add(s2);
+    tmpDataTypes.add(s1type);
+    tmpDataTypes.add(s2type);
+    tmpValues.add(value1);
+    tmpValues.add(value2);
+    times.add(time);
+    measurements.add(tmpMeasurements);
+    datatypes.add(tmpDataTypes);
+    values.add(tmpValues);
+  }
+
+  private void assertNulls(RowRecord record, int[] index) {
+    for (int i : index) {
+      Assert.assertNull(record.getFields().get(i).getDataType());
+    }
+  }
+
 }