You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/03/15 14:33:43 UTC

[incubator-iotdb] 01/01: add insertInBatch in Session

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch add_insertInBatch
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 66aaf6502312db71a974fac774f13b5cd897abe2
Author: qiaojialin <64...@qq.com>
AuthorDate: Sun Mar 15 22:33:23 2020 +0800

    add insertInBatch in Session
---
 .../main/java/org/apache/iotdb/SessionExample.java | 34 +++++++++++
 grafana/pom.xml                                    |  2 +-
 jdbc/pom.xml                                       |  2 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 29 +++++++++
 service-rpc/pom.xml                                |  2 +-
 service-rpc/rpc-changelist.md                      |  2 +
 service-rpc/src/main/thrift/rpc.thrift             | 14 +++++
 .../java/org/apache/iotdb/session/Session.java     | 69 +++++++++++++++-------
 .../org/apache/iotdb/session/IoTDBSessionIT.java   | 64 +++++++++++++++++++-
 9 files changed, 192 insertions(+), 26 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 8e41dbd..49c365e 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -49,6 +49,7 @@ public class SessionExample {
 
     insert();
     insertRowBatch();
+    insertInBatch();
     nonQuery();
     query();
     deleteData();
@@ -101,6 +102,39 @@ public class SessionExample {
     }
   }
 
+  private static void insertInBatch() throws IoTDBSessionException {
+    String deviceId = "root.sg1.d2";
+    List<String> measurements = new ArrayList<>();
+    measurements.add("s1");
+    measurements.add("s2");
+    measurements.add("s3");
+    List<String> deviceIds = new ArrayList<>();
+    List<List<String>> measurementsList = new ArrayList<>();
+    List<List<String>> valuesList = new ArrayList<>();
+    List<Long> timestamps = new ArrayList<>();
+
+    for (long time = 0; time < 500; time++) {
+      List<String> values = new ArrayList<>();
+      values.add("1");
+      values.add("2");
+      values.add("3");
+
+      deviceIds.add(deviceId);
+      measurementsList.add(measurements);
+      valuesList.add(values);
+      timestamps.add(time);
+      if (time != 0 && time % 100 == 0) {
+        session.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
+        deviceIds.clear();
+        measurementsList.clear();
+        valuesList.clear();
+        timestamps.clear();
+      }
+    }
+
+    session.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
+  }
+
   private static void deleteData() throws IoTDBSessionException {
     String path = "root.sg1.d1.s1";
     long deleteTime = 99;
diff --git a/grafana/pom.xml b/grafana/pom.xml
index 2a4f3e1..f50f0fd 100644
--- a/grafana/pom.xml
+++ b/grafana/pom.xml
@@ -165,7 +165,7 @@
                                     <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                         <resource>META-INF/spring.schemas</resource>
                                     </transformer>
-                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                     <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                         <mainClass>${start-class}</mainClass>
                                     </transformer>
diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index 36f373c..5e0b55f 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -126,7 +126,7 @@
                                                 </goals>
                                             </pluginExecutionFilter>
                                             <action>
-                                                <ignore />
+                                                <ignore/>
                                             </action>
                                         </pluginExecution>
                                     </pluginExecutions>
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index da61a4f..23c4368 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -90,6 +90,7 @@ import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteInsertRowInBatchResp;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
 import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
@@ -99,6 +100,7 @@ import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
 import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
 import org.apache.iotdb.service.rpc.thrift.TSHandleIdentifier;
 import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSInsertInBatchReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertionReq;
 import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
@@ -513,6 +515,33 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   }
 
   @Override
+  public TSExecuteInsertRowInBatchResp insertRowInBatch(TSInsertInBatchReq req) {
+    TSExecuteInsertRowInBatchResp resp = new TSExecuteInsertRowInBatchResp();
+    if (!checkLogin()) {
+      logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
+      resp.addToStatusList(new TSStatus(getStatus(TSStatusCode.NOT_LOGIN_ERROR)));
+      return resp;
+    }
+
+    InsertPlan plan = new InsertPlan();
+    for (int i = 0; i < req.deviceIds.size(); i++) {
+      plan.setDeviceId(req.getDeviceIds().get(i));
+      plan.setTime(req.getTimestamps().get(i));
+      plan.setMeasurements(req.getMeasurementsList().get(i).toArray(new String[0]));
+      plan.setValues(req.getValuesList().get(i).toArray(new String[0]));
+      TSStatus status = checkAuthority(plan);
+      if (status != null) {
+        resp.addToStatusList(new TSStatus(status));
+      }
+      else{
+        resp.addToStatusList(executePlan(plan));
+      }
+    }
+
+    return resp;
+  }
+
+  @Override
   public TSExecuteBatchStatementResp executeBatchStatement(TSExecuteBatchStatementReq req) {
     long t1 = System.currentTimeMillis();
     List<Integer> result = new ArrayList<>();
diff --git a/service-rpc/pom.xml b/service-rpc/pom.xml
index c3043e2..ce6abd4 100644
--- a/service-rpc/pom.xml
+++ b/service-rpc/pom.xml
@@ -148,7 +148,7 @@
                                                 </goals>
                                             </pluginExecutionFilter>
                                             <action>
-                                                <ignore />
+                                                <ignore/>
                                             </action>
                                         </pluginExecution>
                                         <pluginExecution>
diff --git a/service-rpc/rpc-changelist.md b/service-rpc/rpc-changelist.md
index 5e08140..6425a98 100644
--- a/service-rpc/rpc-changelist.md
+++ b/service-rpc/rpc-changelist.md
@@ -51,6 +51,8 @@ Last Updated on October 27th, 2019 by Lei Rui.
 | Add method TSStatus deleteData(1:TSDeleteDataReq req)        | Jack Tsai, Jialin Qiao, qiaojialin |
 | Add method TSStatus deleteTimeseries(1:list\<string> path)   | qiaojialin                         |
 | Add method TSStatus deleteStorageGroups(1:list\<string> storageGroup) | Yi Tao                             |
+| Add Struct TSExecuteInsertRowInBatchResp                     | Kaifeng Xue |
+| Add method insertRowInBatch(1:TSInsertInBatchReq req);       | Kaifeng Xue |
 
 
 ## 3. Update
diff --git a/service-rpc/src/main/thrift/rpc.thrift b/service-rpc/src/main/thrift/rpc.thrift
index 5de9c06..533aa3d 100644
--- a/service-rpc/src/main/thrift/rpc.thrift
+++ b/service-rpc/src/main/thrift/rpc.thrift
@@ -233,6 +233,18 @@ struct TSBatchInsertionReq {
     6: required i32 size
 }
 
+struct TSExecuteInsertRowInBatchResp{
+	1: required list<TSStatus> statusList
+}
+
+
+struct TSInsertInBatchReq {
+    1: required list<string> deviceIds
+    2: required list<list<string>> measurementsList
+    3: required list<list<string>> valuesList
+    4: required list<i64> timestamps
+}
+
 struct TSInsertReq {
     1: required string deviceId
     2: required list<string> measurements
@@ -289,6 +301,8 @@ service TSIService {
 
 	TSExecuteBatchStatementResp insertBatch(1:TSBatchInsertionReq req);
 
+  TSExecuteInsertRowInBatchResp insertRowInBatch(1:TSInsertInBatchReq req);
+
 	TSStatus setStorageGroup(1:string storageGroup);
 
 	TSStatus createTimeseries(1:TSCreateTimeseriesReq req);
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 78791b9..5f93e00 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -36,7 +36,6 @@ import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.sql.SQLException;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
@@ -57,7 +56,6 @@ public class Session {
   private TSocket transport;
   private boolean isClosed = true;
   private ZoneId zoneId;
-  private TSOperationHandle operationHandle;
   private long statementId;
   private int fetchSize;
 
@@ -86,11 +84,11 @@ public class Session {
     this.fetchSize = fetchSize;
   }
 
-  public synchronized void open() throws IoTDBSessionException {
+  public void open() throws IoTDBSessionException {
     open(false, 0);
   }
 
-  private synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
+  private void open(boolean enableRPCCompression, int connectionTimeoutInMs)
       throws IoTDBSessionException {
     if (!isClosed) {
       return;
@@ -146,7 +144,7 @@ public class Session {
 
   }
 
-  public synchronized void close() throws IoTDBSessionException {
+  public void close() throws IoTDBSessionException {
     if (isClosed) {
       return;
     }
@@ -164,7 +162,7 @@ public class Session {
     }
   }
 
-  public synchronized TSExecuteBatchStatementResp insertBatch(RowBatch rowBatch)
+  public TSExecuteBatchStatementResp insertBatch(RowBatch rowBatch)
       throws IoTDBSessionException {
     TSBatchInsertionReq request = new TSBatchInsertionReq();
     request.deviceId = rowBatch.deviceId;
@@ -183,7 +181,38 @@ public class Session {
     }
   }
 
-  public synchronized TSStatus insert(String deviceId, long time, List<String> measurements,
+  /**
+   * insert data in batch format, which can reduce the overhead of network
+   */
+  public List<TSStatus> insertInBatch(List<String> deviceIds, List<Long> times,
+      List<List<String>> measurementsList,
+      List<List<String>> valuesList)
+      throws IoTDBSessionException {
+    // check params size
+    int len = deviceIds.size();
+    if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
+      throw new IllegalArgumentException(
+          "deviceIds, times, measurementsList and valuesList's size should be equal");
+    }
+
+    TSInsertInBatchReq request = new TSInsertInBatchReq();
+    request.setDeviceIds(deviceIds);
+    request.setTimestamps(times);
+    request.setMeasurementsList(measurementsList);
+    request.setValuesList(valuesList);
+
+    try {
+      List<TSStatus> result = new ArrayList<>();
+      for (TSStatus cur : client.insertRowInBatch(request).getStatusList()) {
+        result.add(checkAndReturn(cur));
+      }
+      return result;
+    } catch (TException e) {
+      throw new IoTDBSessionException(e);
+    }
+  }
+
+  public TSStatus insert(String deviceId, long time, List<String> measurements,
       List<String> values)
       throws IoTDBSessionException {
     TSInsertReq request = new TSInsertReq();
@@ -204,7 +233,7 @@ public class Session {
    *
    * @param path timeseries to delete, should be a whole path
    */
-  synchronized TSStatus deleteTimeseries(String path) throws IoTDBSessionException {
+  public TSStatus deleteTimeseries(String path) throws IoTDBSessionException {
     List<String> paths = new ArrayList<>();
     paths.add(path);
     return deleteTimeseries(paths);
@@ -215,7 +244,7 @@ public class Session {
    *
    * @param paths timeseries to delete, should be a whole path
    */
-  public synchronized TSStatus deleteTimeseries(List<String> paths) throws IoTDBSessionException {
+  public TSStatus deleteTimeseries(List<String> paths) throws IoTDBSessionException {
     try {
       return checkAndReturn(client.deleteTimeseries(paths));
     } catch (TException e) {
@@ -229,7 +258,7 @@ public class Session {
    * @param path data in which time series to delete
    * @param time data with time stamp less than or equal to time will be deleted
    */
-  public synchronized TSStatus deleteData(String path, long time) throws IoTDBSessionException {
+  public TSStatus deleteData(String path, long time) throws IoTDBSessionException {
     List<String> paths = new ArrayList<>();
     paths.add(path);
     return deleteData(paths, time);
@@ -241,7 +270,7 @@ public class Session {
    * @param paths data in which time series to delete
    * @param time data with time stamp less than or equal to time will be deleted
    */
-  synchronized TSStatus deleteData(List<String> paths, long time)
+  public TSStatus deleteData(List<String> paths, long time)
       throws IoTDBSessionException {
     TSDeleteDataReq request = new TSDeleteDataReq();
     request.setPaths(paths);
@@ -254,7 +283,7 @@ public class Session {
     }
   }
 
-  public synchronized TSStatus setStorageGroup(String storageGroupId) throws IoTDBSessionException {
+  public TSStatus setStorageGroup(String storageGroupId) throws IoTDBSessionException {
     checkPathValidity(storageGroupId);
     try {
       return checkAndReturn(client.setStorageGroup(storageGroupId));
@@ -264,14 +293,14 @@ public class Session {
   }
 
 
-  synchronized TSStatus deleteStorageGroup(String storageGroup)
+  public TSStatus deleteStorageGroup(String storageGroup)
       throws IoTDBSessionException {
     List<String> groups = new ArrayList<>();
     groups.add(storageGroup);
     return deleteStorageGroups(groups);
   }
 
-  synchronized TSStatus deleteStorageGroups(List<String> storageGroup)
+  public TSStatus deleteStorageGroups(List<String> storageGroup)
       throws IoTDBSessionException {
     try {
       return checkAndReturn(client.deleteStorageGroups(storageGroup));
@@ -280,7 +309,7 @@ public class Session {
     }
   }
 
-  public synchronized TSStatus createTimeseries(String path, TSDataType dataType,
+  public TSStatus createTimeseries(String path, TSDataType dataType,
       TSEncoding encoding, CompressionType compressor) throws IoTDBSessionException {
     checkPathValidity(path);
     TSCreateTimeseriesReq request = new TSCreateTimeseriesReq();
@@ -310,7 +339,7 @@ public class Session {
     return resp;
   }
 
-  private synchronized String getTimeZone() throws TException, IoTDBRPCException {
+  private String getTimeZone() throws TException, IoTDBRPCException {
     if (zoneId != null) {
       return zoneId.toString();
     }
@@ -320,7 +349,7 @@ public class Session {
     return resp.getTimeZone();
   }
 
-  private synchronized void setTimeZone(String zoneId) throws TException, IoTDBRPCException {
+  private void setTimeZone(String zoneId) throws TException, IoTDBRPCException {
     TSSetTimeZoneReq req = new TSSetTimeZoneReq(zoneId);
     TSStatus resp = client.setTimeZone(req);
     RpcUtils.verifySuccess(resp);
@@ -355,9 +384,9 @@ public class Session {
     TSExecuteStatementResp execResp = client.executeStatement(execReq);
 
     RpcUtils.verifySuccess(execResp.getStatus());
-    operationHandle = execResp.getOperationHandle();
     SessionDataSet dataSet = new SessionDataSet(sql, execResp.getColumns(), execResp.getDataTypeList(),
-            operationHandle.getOperationId().getQueryId(), client, operationHandle);
+        execResp.getOperationHandle().getOperationId().getQueryId(), client,
+        execResp.getOperationHandle());
     dataSet.setBatchSize(fetchSize);
     return dataSet;
   }
@@ -375,8 +404,6 @@ public class Session {
 
     TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionHandle, sql, statementId);
     TSExecuteStatementResp execResp = client.executeUpdateStatement(execReq);
-    operationHandle = execResp.getOperationHandle();
-
     RpcUtils.verifySuccess(execResp.getStatus());
   }
 
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
index 30c7afc..5a36510 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
@@ -93,6 +93,10 @@ public class IoTDBSessionIT {
 
     query2();
 
+    insertInBatch();
+
+    query4();
+
     // Add another storage group to test the deletion of storage group
     session.setStorageGroup("root.sg2");
     session.createTimeseries("root.sg2.d1.s1", TSDataType.INT64, TSEncoding.RLE,
@@ -132,6 +136,45 @@ public class IoTDBSessionIT {
         CompressionType.SNAPPY);
     session.createTimeseries("root.sg1.d1.s3", TSDataType.INT64, TSEncoding.RLE,
         CompressionType.SNAPPY);
+    session.createTimeseries("root.sg1.d2.s1", TSDataType.INT64, TSEncoding.RLE,
+        CompressionType.SNAPPY);
+    session.createTimeseries("root.sg1.d2.s2", TSDataType.INT64, TSEncoding.RLE,
+        CompressionType.SNAPPY);
+    session.createTimeseries("root.sg1.d2.s3", TSDataType.INT64, TSEncoding.RLE,
+        CompressionType.SNAPPY);
+  }
+
+  private void insertInBatch() throws IoTDBSessionException {
+    String deviceId = "root.sg1.d2";
+    List<String> measurements = new ArrayList<>();
+    measurements.add("s1");
+    measurements.add("s2");
+    measurements.add("s3");
+    List<String> deviceIds = new ArrayList<>();
+    List<List<String>> measurementsList = new ArrayList<>();
+    List<List<String>> valuesList = new ArrayList<>();
+    List<Long> timestamps = new ArrayList<>();
+
+    for (long time = 0; time < 500; time++) {
+      List<String> values = new ArrayList<>();
+      values.add("1");
+      values.add("2");
+      values.add("3");
+
+      deviceIds.add(deviceId);
+      measurementsList.add(measurements);
+      valuesList.add(values);
+      timestamps.add(time);
+      if (time != 0 && time % 100 == 0) {
+        session.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
+        deviceIds.clear();
+        measurementsList.clear();
+        valuesList.clear();
+        timestamps.clear();
+      }
+    }
+
+    session.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
   }
 
   private void insert() throws IoTDBSessionException {
@@ -199,7 +242,8 @@ public class IoTDBSessionIT {
   private void query() throws ClassNotFoundException, SQLException {
     Class.forName(Config.JDBC_DRIVER_NAME);
     String standard =
-        "Time\n" + "root.sg1.d1.s1\n" + "root.sg1.d1.s2\n" + "root.sg1.d1.s3\n";
+        "Time\n" + "root.sg1.d1.s1\n" + "root.sg1.d1.s2\n" + "root.sg1.d1.s3\n"
+            + "root.sg1.d2.s1\n" + "root.sg1.d2.s2\n" + "root.sg1.d2.s3\n";
     try (Connection connection = DriverManager
         .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
         Statement statement = connection.createStatement()) {
@@ -223,7 +267,8 @@ public class IoTDBSessionIT {
   private void query2() throws ClassNotFoundException, SQLException {
     Class.forName(Config.JDBC_DRIVER_NAME);
     String standard =
-        "Time\n" + "root.sg1.d1.s2\n" + "root.sg1.d1.s3\n";
+        "Time\n" + "root.sg1.d1.s2\n" + "root.sg1.d1.s3\n"
+            + "root.sg1.d2.s1\n" + "root.sg1.d2.s2\n" + "root.sg1.d2.s3\n";
     try (Connection connection = DriverManager
         .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
         Statement statement = connection.createStatement()) {
@@ -300,6 +345,21 @@ public class IoTDBSessionIT {
     sessionDataSet.closeOperationHandle();
   }
 
+  private void query4() throws TException, IoTDBRPCException, SQLException {
+    SessionDataSet sessionDataSet = session.executeQueryStatement("select * from root.sg1.d2");
+    sessionDataSet.setBatchSize(1024);
+    int count = 0;
+    while (sessionDataSet.hasNext()) {
+      long index = 1;
+      count++;
+      for (Field f : sessionDataSet.next().getFields()) {
+        Assert.assertEquals(f.getLongV(), index);
+        index++;
+      }
+    }
+    Assert.assertEquals(500, count);
+    sessionDataSet.closeOperationHandle();
+  }
 
   private void insert_via_sql() throws TException, IoTDBRPCException {
     session.executeNonQueryStatement(