You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2020/08/19 13:31:40 UTC

[GitHub] [incubator-iotdb] SilverNarcissus commented on a change in pull request #1337: [IOTDB-746] Add async insertion interfaces in Session and SessionPool

SilverNarcissus commented on a change in pull request #1337:
URL: https://github.com/apache/incubator-iotdb/pull/1337#discussion_r473021568



##########
File path: session/src/main/java/org/apache/iotdb/session/Session.java
##########
@@ -396,6 +519,42 @@ public void insertRecords(List<String> deviceIds, List<Long> times,
     }
   }
 
+  /**
+   * Insert multiple rows in asynchronous way. This method is just like jdbc executeBatch,
+   * we pack some insert request in batch and send them to server. If you want improve your
+   * performance, please see insertTablet method.
+   * <p>
+   * Each row is independent, which could have different deviceId, time, number of measurements
+   *
+   * @see Session#insertTablet(Tablet)
+   * @param timeout asynchronous call timeout in millisecond
+   * @param callback user provided failure callback, set to null if user does not specify.
+   */
+  public CompletableFuture<Integer> asyncInsertRecords(List<String> deviceIds, List<Long> times,
+      List<List<String>> measurementsList, List<List<String>> valuesList, long timeout,
+      FiveInputConsumer<List<String>, List<Long>, List<List<String>>, List<List<String>>, Throwable> callback) {
+    CompletableFuture<Integer> asyncRun = CompletableFuture.supplyAsync(() -> {
+      try {
+        insertRecords(deviceIds, times, measurementsList, valuesList);
+      } catch (IoTDBConnectionException | StatementExecutionException e) {
+        throw new RuntimeException(e);
+      }
+      return 0;
+    }, asyncThreadPool.getThreadPool());
+
+    return asyncRun
+        .applyToEitherAsync(orTimeout(timeout, TimeUnit.MILLISECONDS), this::successHandler)
+        .exceptionally(e -> {
+          if (callback == null) {
+            logger.error("Error occurred when inserting records, device ID: {}, time: {}.",

Review comment:
       Maybe we should print deviceID list and time list. Or you can print the length of the list, more info means easier to debug :)

##########
File path: session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
##########
@@ -292,6 +357,34 @@ public void insertTablets(Map<String, Tablet> tablets)
     insertTablets(tablets, false);
   }
 
+  /**
+   * use batch interface to insert data in an asynchronous manner
+   * @param tablets
+   * @param timeout asynchronous call timeout in millisecond
+   */
+  public CompletableFuture<Integer> asyncInsertTablets(Map<String, Tablet> tablets, boolean sorted,

Review comment:
       This method has a same one in session, maybe we can abstract a class named AsyncSession which contains in both Session and SessionPool




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org