You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/06/01 16:46:28 UTC

[incubator-iotdb] branch fix-IOTDB-615 created (now 79f367e)

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a change to branch fix-IOTDB-615
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 79f367e  fix session.insertTablets api and add test methods

This branch includes the following new commits:

     new 79f367e  fix session.insertTablets api and add test methods

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: fix session.insertTablets api and add test methods

Posted by hx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch fix-IOTDB-615
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 79f367e953fec2cceb9789c466fcee6a81a77843
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Tue Jun 2 00:46:05 2020 +0800

    fix session.insertTablets api and add test methods
---
 .../org/apache/iotdb/db/service/TSServiceImpl.java |   6 +
 .../java/org/apache/iotdb/session/Session.java     | 212 ++++++++++++++-------
 .../org/apache/iotdb/session/pool/SessionPool.java |  22 +++
 3 files changed, 176 insertions(+), 64 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index d0cff5d..27cfd44 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -1102,6 +1102,12 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   }
 
   @Override
+  public TSExecuteBatchStatementResp testInsertTablets(TSInsertTabletsReq req) throws TException {
+    logger.debug("Test insert batch request receive.");
+    return RpcUtils.getTSBatchExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
+  }
+
+  @Override
   public TSStatus testInsertRecord(TSInsertRecordReq req) {
     logger.debug("Test insert row request receive.");
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index f3e36ae..f546ce1 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -228,6 +228,16 @@ public class Session {
    */
   public void insertTablet(Tablet tablet, boolean sorted)
       throws IoTDBConnectionException, BatchExecutionException {
+    TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted);
+    try {
+      RpcUtils.verifySuccess(client.insertTablet(request).statusList);
+    } catch (TException e) {
+      throw new IoTDBConnectionException(e);
+    }
+  }
+
+  private TSInsertTabletReq genTSInsertTabletReq(Tablet tablet, boolean sorted)
+      throws BatchExecutionException {
     if (sorted) {
       if (!checkSorted(tablet)) {
         throw new BatchExecutionException("Times in Tablet are not in ascending order");
@@ -246,12 +256,7 @@ public class Session {
     request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
     request.setValues(SessionUtils.getValueBuffer(tablet));
     request.setSize(tablet.rowSize);
-
-    try {
-      RpcUtils.verifySuccess(client.insertTablet(request).statusList);
-    } catch (TException e) {
-      throw new IoTDBConnectionException(e);
-    }
+    return request;
   }
 
   /**
@@ -277,6 +282,16 @@ public class Session {
   public void insertTablets(Map<String, Tablet> tablets, boolean sorted)
       throws IoTDBConnectionException, BatchExecutionException {
 
+    TSInsertTabletsReq request = genTSInsertTabletsReq(tablets, sorted);
+    try {
+      RpcUtils.verifySuccess(client.insertTablets(request).statusList);
+    } catch (TException e) {
+      throw new IoTDBConnectionException(e);
+    }
+  }
+
+  private TSInsertTabletsReq genTSInsertTabletsReq(Map<String, Tablet> tablets, boolean sorted)
+      throws BatchExecutionException {
     TSInsertTabletsReq request = new TSInsertTabletsReq();
     request.setSessionId(sessionId);
 
@@ -301,13 +316,8 @@ public class Session {
       request.addToTimestampsList(SessionUtils.getTimeBuffer(tablet));
       request.addToValuesList(SessionUtils.getValueBuffer(tablet));
       request.addToSizeList(tablet.rowSize);
-
-      try {
-        RpcUtils.verifySuccess(client.insertTablets(request).statusList);
-      } catch (TException e) {
-        throw new IoTDBConnectionException(e);
-      }
     }
+    return request;
   }
 
   /**
@@ -323,6 +333,18 @@ public class Session {
       List<List<String>> measurementsList, List<List<TSDataType>> typesList,
       List<List<Object>> valuesList)
       throws IoTDBConnectionException, BatchExecutionException {
+    TSInsertRecordsReq request = genTSInsertRecordsReq(deviceIds, times, measurementsList,
+        typesList, valuesList);
+    try {
+      RpcUtils.verifySuccess(client.insertRecords(request).statusList);
+    } catch (TException e) {
+      throw new IoTDBConnectionException(e);
+    }
+  }
+
+  private TSInsertRecordsReq genTSInsertRecordsReq(List<String> deviceIds, List<Long> times,
+      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList) throws IoTDBConnectionException {
     // check params size
     int len = deviceIds.size();
     if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
@@ -343,12 +365,7 @@ public class Session {
       buffersList.add(buffer);
     }
     request.setValuesList(buffersList);
-
-    try {
-      RpcUtils.verifySuccess(client.insertRecords(request).statusList);
-    } catch (TException e) {
-      throw new IoTDBConnectionException(e);
-    }
+    return  request;
   }
 
   /**
@@ -363,6 +380,18 @@ public class Session {
   public void insertRecords(List<String> deviceIds, List<Long> times,
       List<List<String>> measurementsList, List<List<String>> valuesList)
       throws IoTDBConnectionException, BatchExecutionException {
+
+    TSInsertRecordsReq request =genTSInsertRecordsReq(deviceIds, times, measurementsList, valuesList);
+    try {
+      RpcUtils.verifySuccess(client.insertRecords(request).statusList);
+    } catch (TException e) {
+      throw new IoTDBConnectionException(e);
+    }
+  }
+
+  private TSInsertRecordsReq genTSInsertRecordsReq(List<String> deviceIds, List<Long> times,
+      List<List<String>> measurementsList, List<List<String>> valuesList)
+      throws IoTDBConnectionException {
     // check params size
     int len = deviceIds.size();
     if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
@@ -384,15 +413,9 @@ public class Session {
       buffersList.add(buffer);
     }
     request.setValuesList(buffersList);
-
-    try {
-      RpcUtils.verifySuccess(client.insertRecords(request).statusList);
-    } catch (TException e) {
-      throw new IoTDBConnectionException(e);
-    }
+    return  request;
   }
 
-
   /**
    * insert data in one row, if you want improve your performance, please use insertInBatch method
    * or insertBatch method
@@ -403,6 +426,17 @@ public class Session {
   public void insertRecord(String deviceId, long time, List<String> measurements,
       List<TSDataType> types,
       List<Object> values) throws IoTDBConnectionException, StatementExecutionException {
+    TSInsertRecordReq request = genTSInsertRecordReq(deviceId, time, measurements, types, values);
+    try {
+      RpcUtils.verifySuccess(client.insertRecord(request));
+    } catch (TException e) {
+      throw new IoTDBConnectionException(e);
+    }
+  }
+
+  private TSInsertRecordReq genTSInsertRecordReq(String deviceId, long time, List<String> measurements,
+      List<TSDataType> types,
+      List<Object> values) throws IoTDBConnectionException {
     TSInsertRecordReq request = new TSInsertRecordReq();
     request.setSessionId(sessionId);
     request.setDeviceId(deviceId);
@@ -412,12 +446,7 @@ public class Session {
     putValues(types, values, buffer);
     buffer.flip();
     request.setValues(buffer);
-
-    try {
-      RpcUtils.verifySuccess(client.insertRecord(request));
-    } catch (TException e) {
-      throw new IoTDBConnectionException(e);
-    }
+    return  request;
   }
 
   /**
@@ -429,6 +458,17 @@ public class Session {
    */
   public void insertRecord(String deviceId, long time, List<String> measurements,
       List<String> values) throws IoTDBConnectionException, StatementExecutionException {
+
+    TSInsertRecordReq request = genTSInsertRecordReq(deviceId, time, measurements, values);
+    try {
+      RpcUtils.verifySuccess(client.insertRecord(request));
+    } catch (TException e) {
+      throw new IoTDBConnectionException(e);
+    }
+  }
+
+  private TSInsertRecordReq genTSInsertRecordReq(String deviceId, long time, List<String> measurements,
+      List<String> values) throws IoTDBConnectionException {
     TSInsertRecordReq request = new TSInsertRecordReq();
     request.setSessionId(sessionId);
     request.setDeviceId(deviceId);
@@ -439,14 +479,10 @@ public class Session {
     putStrValues(values, buffer);
     buffer.flip();
     request.setValues(buffer);
-
-    try {
-      RpcUtils.verifySuccess(client.insertRecord(request));
-    } catch (TException e) {
-      throw new IoTDBConnectionException(e);
-    }
+    return request;
   }
 
+
   private void putStrValues(List<String> values, ByteBuffer buffer)
       throws IoTDBConnectionException {
     for (int i = 0; i < values.size(); i++) {
@@ -550,16 +586,22 @@ public class Session {
    */
   public void testInsertTablet(Tablet tablet)
       throws IoTDBConnectionException, BatchExecutionException {
-    TSInsertTabletReq request = new TSInsertTabletReq();
-    request.setSessionId(sessionId);
-    request.deviceId = tablet.deviceId;
-    for (MeasurementSchema measurementSchema : tablet.getSchemas()) {
-      request.addToMeasurements(measurementSchema.getMeasurementId());
-      request.addToTypes(measurementSchema.getType().ordinal());
+    TSInsertTabletReq request = genTSInsertTabletReq(tablet, false);
+
+    try {
+      RpcUtils.verifySuccess(client.testInsertTablet(request).statusList);
+    } catch (TException e) {
+      throw new IoTDBConnectionException(e);
     }
-    request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
-    request.setValues(SessionUtils.getValueBuffer(tablet));
-    request.setSize(tablet.rowSize);
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public void testInsertTablet(Tablet tablet, boolean sorted)
+      throws IoTDBConnectionException, BatchExecutionException {
+    TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted);
 
     try {
       RpcUtils.verifySuccess(client.testInsertTablet(request).statusList);
@@ -572,23 +614,54 @@ public class Session {
    * This method NOT insert data into database and the server just return after accept the request,
    * this method should be used to test other time cost in client
    */
+  public void testInsertTablets(Map<String, Tablet> tablets)
+      throws IoTDBConnectionException, BatchExecutionException {
+    TSInsertTabletsReq request = genTSInsertTabletsReq(tablets, false);
+
+    try {
+      RpcUtils.verifySuccess(client.testInsertTablets(request).statusList);
+    } catch (TException e) {
+      throw new IoTDBConnectionException(e);
+    }
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public void testInsertTablets(Map<String, Tablet> tablets, boolean sorted)
+      throws IoTDBConnectionException, BatchExecutionException {
+    TSInsertTabletsReq request = genTSInsertTabletsReq(tablets, sorted);
+
+    try {
+      RpcUtils.verifySuccess(client.testInsertTablets(request).statusList);
+    } catch (TException e) {
+      throw new IoTDBConnectionException(e);
+    }
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return after accept the request,
+   * this method should be used to test other time cost in client
+   */
   public void testInsertRecords(List<String> deviceIds, List<Long> times,
       List<List<String>> measurementsList, List<List<String>> valuesList)
       throws IoTDBConnectionException, BatchExecutionException {
-    // check params size
-    int len = deviceIds.size();
-    if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) {
-      throw new IllegalArgumentException(
-          "deviceIds, times, measurementsList and valuesList's size should be equal");
-    }
+    TSInsertRecordsReq request = genTSInsertRecordsReq(deviceIds, times, measurementsList, valuesList);
 
-    TSInsertRecordsReq request = new TSInsertRecordsReq();
-    request.setSessionId(sessionId);
-    request.setDeviceIds(deviceIds);
-    request.setTimestamps(times);
-    request.setMeasurementsList(measurementsList);
-    request.setValuesList(new ArrayList<>());
+    try {
+      RpcUtils.verifySuccess(client.testInsertRecords(request).statusList);
+    } catch (TException e) {
+      throw new IoTDBConnectionException(e);
+    }
+  }
 
+  public void testInsertRecords(List<String> deviceIds, List<Long> times,
+      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList)
+      throws IoTDBConnectionException, BatchExecutionException {
+    TSInsertRecordsReq request = genTSInsertRecordsReq(deviceIds, times, measurementsList,
+        typesList, valuesList);
     try {
       RpcUtils.verifySuccess(client.testInsertRecords(request).statusList);
     } catch (TException e) {
@@ -596,18 +669,29 @@ public class Session {
     }
   }
 
+
   /**
    * This method NOT insert data into database and the server just return after accept the request,
    * this method should be used to test other time cost in client
    */
   public void testInsertRecord(String deviceId, long time, List<String> measurements,
       List<String> values) throws IoTDBConnectionException, StatementExecutionException {
-    TSInsertRecordReq request = new TSInsertRecordReq();
-    request.setSessionId(sessionId);
-    request.setDeviceId(deviceId);
-    request.setTimestamp(time);
-    request.setMeasurements(measurements);
-    request.setValues(ByteBuffer.allocate(1));
+    TSInsertRecordReq request = genTSInsertRecordReq(deviceId, time, measurements, values);
+
+    try {
+      RpcUtils.verifySuccess(client.testInsertRecord(request));
+    } catch (TException e) {
+      throw new IoTDBConnectionException(e);
+    }
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public void testInsertRecord(String deviceId, long time, List<String> measurements,
+      List<TSDataType> types, List<Object> values) throws IoTDBConnectionException, StatementExecutionException {
+    TSInsertRecordReq request = genTSInsertRecordReq(deviceId, time, measurements, types, values);
 
     try {
       RpcUtils.verifySuccess(client.testInsertRecord(request));
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 97cb37d..c08cf9d 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -488,6 +488,28 @@ public class SessionPool {
   }
 
   /**
+   * This method NOT insert data into database and the server just return after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public void testInsertRecord(String deviceId, long time, List<String> measurements,
+      List<TSDataType> types, List<Object> values) throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.testInsertRecord(deviceId, time, measurements, types, values);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+  /**
    * delete a timeseries, including data and schema
    *
    * @param path timeseries to delete, should be a whole path