You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2020/06/09 16:27:17 UTC

[GitHub] [incubator-iotdb] wshao08 opened a new pull request #1337: [IOTDB-746] Add async insertion interfaces in Session and SessionPool

wshao08 opened a new pull request #1337:
URL: https://github.com/apache/incubator-iotdb/pull/1337


   JIRA issue: https://issues.apache.org/jira/browse/IOTDB-746
   This PR adds corresponding asynchronous insertion interfaces for Session and SessionPool.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #1337: [IOTDB-746] Add async insertion interfaces in Session and SessionPool

Posted by GitBox <gi...@apache.org>.
qiaojialin commented on a change in pull request #1337:
URL: https://github.com/apache/incubator-iotdb/pull/1337#discussion_r439222673



##########
File path: session/src/main/java/org/apache/iotdb/session/pool/SessionThreadPool.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.pool;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.iotdb.session.Config;
+
+public class SessionThreadPool {
+
+  private ExecutorService pool;
+  private BlockingQueue<Runnable> threadQueue;
+  private static final int WAIT_TIMEOUT = 10000;
+
+  public SessionThreadPool() {
+    threadQueue = new LinkedBlockingQueue<Runnable>(Config.DEFAULT_BLOCKING_QUEUE_SIZE);
+    pool = new ThreadPoolExecutor(Config.DEFAULT_THREAD_POOL_SIZE, Config.DEFAULT_THREAD_POOL_SIZE,
+        0L, TimeUnit.MILLISECONDS, threadQueue, new CustomPolicy());
+  }
+
+  public SessionThreadPool(int poolSize, int blockingQueueSize) {
+    threadQueue = new LinkedBlockingQueue<Runnable>(blockingQueueSize);
+    pool = new ThreadPoolExecutor(poolSize, poolSize,
+        0L, TimeUnit.MILLISECONDS, threadQueue, new CustomPolicy());
+  }
+
+  public synchronized Future<?> submit(Runnable task) {
+    return pool.submit(task);
+  }
+
+  public synchronized <T> Future<T> submit(Callable<T> task) {
+    return pool.submit(task);
+  }
+
+  public Executor getThreadPool() {
+    return pool;
+  }
+
+  private static class CustomPolicy implements RejectedExecutionHandler {
+    public CustomPolicy() {}
+
+    @Override
+    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+      try {
+        synchronized (r) {
+          r.wait(WAIT_TIMEOUT);
+        }
+      } catch (InterruptedException e) {
+        e.printStackTrace();

Review comment:
       use logger




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-iotdb] HTHou commented on a change in pull request #1337: [IOTDB-746] Add async insertion interfaces in Session and SessionPool

Posted by GitBox <gi...@apache.org>.
HTHou commented on a change in pull request #1337:
URL: https://github.com/apache/incubator-iotdb/pull/1337#discussion_r448861261



##########
File path: docs/UserGuide/Client/Programming - Native API.md
##########
@@ -150,6 +159,38 @@ Here we show the commonly used interfaces and their parameters in the Native API
         List<List<String>> measurementsList, List<List<TSDataType>> typesList,
         List<List<Object>> valuesList)
   ```
+  
+* Insert multiple Records asynchronously by providing additional asynchronous timeout and an exception callback. With type info the server has no need to do type inference, which leads a better performance

Review comment:
       ```suggestion
   * Insert multiple records asynchronously by providing additional asynchronous timeout and an exception callback. With type info the server has no need to do type inference, which leads a better performance
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #1337: [IOTDB-746] Add async insertion interfaces in Session and SessionPool

Posted by GitBox <gi...@apache.org>.
qiaojialin commented on a change in pull request #1337:
URL: https://github.com/apache/incubator-iotdb/pull/1337#discussion_r437479376



##########
File path: session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
##########
@@ -247,6 +262,16 @@ public void insertTablet(Tablet tablet)
     insertTablet(tablet, false);
   }
 
+  public void asyncInsertTablet(Tablet tablet) {
+    threadPool.submit(() -> {
+      try {
+        insertTablet(tablet);
+      } catch (BatchExecutionException | IoTDBConnectionException e) {
+        e.printStackTrace();

Review comment:
       use a logger




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-iotdb] SilverNarcissus commented on a change in pull request #1337: [IOTDB-746] Add async insertion interfaces in Session and SessionPool

Posted by GitBox <gi...@apache.org>.
SilverNarcissus commented on a change in pull request #1337:
URL: https://github.com/apache/incubator-iotdb/pull/1337#discussion_r473021568



##########
File path: session/src/main/java/org/apache/iotdb/session/Session.java
##########
@@ -396,6 +519,42 @@ public void insertRecords(List<String> deviceIds, List<Long> times,
     }
   }
 
+  /**
+   * Insert multiple rows in asynchronous way. This method is just like jdbc executeBatch,
+   * we pack some insert request in batch and send them to server. If you want improve your
+   * performance, please see insertTablet method.
+   * <p>
+   * Each row is independent, which could have different deviceId, time, number of measurements
+   *
+   * @see Session#insertTablet(Tablet)
+   * @param timeout asynchronous call timeout in millisecond
+   * @param callback user provided failure callback, set to null if user does not specify.
+   */
+  public CompletableFuture<Integer> asyncInsertRecords(List<String> deviceIds, List<Long> times,
+      List<List<String>> measurementsList, List<List<String>> valuesList, long timeout,
+      FiveInputConsumer<List<String>, List<Long>, List<List<String>>, List<List<String>>, Throwable> callback) {
+    CompletableFuture<Integer> asyncRun = CompletableFuture.supplyAsync(() -> {
+      try {
+        insertRecords(deviceIds, times, measurementsList, valuesList);
+      } catch (IoTDBConnectionException | StatementExecutionException e) {
+        throw new RuntimeException(e);
+      }
+      return 0;
+    }, asyncThreadPool.getThreadPool());
+
+    return asyncRun
+        .applyToEitherAsync(orTimeout(timeout, TimeUnit.MILLISECONDS), this::successHandler)
+        .exceptionally(e -> {
+          if (callback == null) {
+            logger.error("Error occurred when inserting records, device ID: {}, time: {}.",

Review comment:
       Maybe we should print deviceID list and time list. Or you can print the length of the list, more info means easier to debug :)

##########
File path: session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
##########
@@ -292,6 +357,34 @@ public void insertTablets(Map<String, Tablet> tablets)
     insertTablets(tablets, false);
   }
 
+  /**
+   * use batch interface to insert data in an asynchronous manner
+   * @param tablets
+   * @param timeout asynchronous call timeout in millisecond
+   */
+  public CompletableFuture<Integer> asyncInsertTablets(Map<String, Tablet> tablets, boolean sorted,

Review comment:
       This method has a same one in session, maybe we can abstract a class named AsyncSession which contains in both Session and SessionPool




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-iotdb] wshao08 commented on a change in pull request #1337: [IOTDB-746] Add async insertion interfaces in Session and SessionPool

Posted by GitBox <gi...@apache.org>.
wshao08 commented on a change in pull request #1337:
URL: https://github.com/apache/incubator-iotdb/pull/1337#discussion_r439888671



##########
File path: session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
##########
@@ -420,6 +555,43 @@ public void insertRecord(String deviceId, long time, List<String> measurements,
     }
   }
 
+  /**
+   * insert data in one row asynchronously, if you want improve your performance,
+   * please use insertRecords method or insertTablet method
+   *
+   * @see Session#insertRecords(List, List, List, List, List)
+   * @see Session#insertTablet(Tablet)
+   */
+  public void asyncInsertRecord(String deviceId, long time, List<String> measurements,
+      List<String> values) {
+    CompletableFuture<Void> timeout = failAfter(Duration.ofSeconds(ASYNC_TIMEOUT));
+    CompletableFuture<Void> asyncRun = CompletableFuture.supplyAsync(() -> {
+      try {
+        insertRecord(deviceId, time, measurements, values);
+      } catch (IoTDBConnectionException | StatementExecutionException e) {
+        logger.error("Error occurred when inserting tablets: ", e);
+      }
+      return null;
+    }, threadPool.getThreadPool());
+
+    asyncRun.acceptEither(timeout, this::asyncHandler);
+  }
+
+  private static <T> CompletableFuture<T> failAfter(Duration duration) {
+    ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+    CompletableFuture<T> promise = new CompletableFuture<>();
+    scheduler.schedule(() -> {
+      final TimeoutException ex = new TimeoutException("Timeout after " + duration);
+      return promise.completeExceptionally(ex);
+    }, duration.toMillis(), TimeUnit.MILLISECONDS);
+    return promise;
+  }
+
+  private void asyncHandler(Void aVoid) {
+    logger.info("Insertion executed successfully.");

Review comment:
       Fixed

##########
File path: session/src/main/java/org/apache/iotdb/session/pool/SessionThreadPool.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.pool;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.iotdb.session.Config;
+
+public class SessionThreadPool {
+
+  private ExecutorService pool;
+  private BlockingQueue<Runnable> threadQueue;
+  private static final int WAIT_TIMEOUT = 10000;
+
+  public SessionThreadPool() {
+    threadQueue = new LinkedBlockingQueue<Runnable>(Config.DEFAULT_BLOCKING_QUEUE_SIZE);
+    pool = new ThreadPoolExecutor(Config.DEFAULT_THREAD_POOL_SIZE, Config.DEFAULT_THREAD_POOL_SIZE,
+        0L, TimeUnit.MILLISECONDS, threadQueue, new CustomPolicy());
+  }
+
+  public SessionThreadPool(int poolSize, int blockingQueueSize) {
+    threadQueue = new LinkedBlockingQueue<Runnable>(blockingQueueSize);
+    pool = new ThreadPoolExecutor(poolSize, poolSize,
+        0L, TimeUnit.MILLISECONDS, threadQueue, new CustomPolicy());
+  }
+
+  public synchronized Future<?> submit(Runnable task) {
+    return pool.submit(task);
+  }
+
+  public synchronized <T> Future<T> submit(Callable<T> task) {
+    return pool.submit(task);
+  }
+
+  public Executor getThreadPool() {
+    return pool;
+  }
+
+  private static class CustomPolicy implements RejectedExecutionHandler {
+    public CustomPolicy() {}
+
+    @Override
+    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+      try {
+        synchronized (r) {
+          r.wait(WAIT_TIMEOUT);
+        }
+      } catch (InterruptedException e) {
+        e.printStackTrace();

Review comment:
       Fixed

##########
File path: session/src/main/java/org/apache/iotdb/session/pool/SessionThreadPool.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.pool;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.iotdb.session.Config;
+
+public class SessionThreadPool {
+
+  private ExecutorService pool;
+  private BlockingQueue<Runnable> threadQueue;
+  private static final int WAIT_TIMEOUT = 10000;

Review comment:
       Fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #1337: [IOTDB-746] Add async insertion interfaces in Session and SessionPool

Posted by GitBox <gi...@apache.org>.
qiaojialin commented on a change in pull request #1337:
URL: https://github.com/apache/incubator-iotdb/pull/1337#discussion_r439251773



##########
File path: session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
##########
@@ -420,6 +555,43 @@ public void insertRecord(String deviceId, long time, List<String> measurements,
     }
   }
 
+  /**
+   * insert data in one row asynchronously, if you want improve your performance,
+   * please use insertRecords method or insertTablet method
+   *
+   * @see Session#insertRecords(List, List, List, List, List)
+   * @see Session#insertTablet(Tablet)
+   */
+  public void asyncInsertRecord(String deviceId, long time, List<String> measurements,
+      List<String> values) {
+    CompletableFuture<Void> timeout = failAfter(Duration.ofSeconds(ASYNC_TIMEOUT));
+    CompletableFuture<Void> asyncRun = CompletableFuture.supplyAsync(() -> {
+      try {
+        insertRecord(deviceId, time, measurements, values);
+      } catch (IoTDBConnectionException | StatementExecutionException e) {
+        logger.error("Error occurred when inserting tablets: ", e);
+      }
+      return null;
+    }, threadPool.getThreadPool());
+
+    asyncRun.acceptEither(timeout, this::asyncHandler);
+  }
+
+  private static <T> CompletableFuture<T> failAfter(Duration duration) {
+    ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+    CompletableFuture<T> promise = new CompletableFuture<>();
+    scheduler.schedule(() -> {
+      final TimeoutException ex = new TimeoutException("Timeout after " + duration);
+      return promise.completeExceptionally(ex);
+    }, duration.toMillis(), TimeUnit.MILLISECONDS);
+    return promise;
+  }
+
+  private void asyncHandler(Void aVoid) {
+    logger.info("Insertion executed successfully.");

Review comment:
       change to debug level and print the device + time




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-iotdb] jt2594838 edited a comment on pull request #1337: [IOTDB-746] Add async insertion interfaces in Session and SessionPool

Posted by GitBox <gi...@apache.org>.
jt2594838 edited a comment on pull request #1337:
URL: https://github.com/apache/incubator-iotdb/pull/1337#issuecomment-641709191


   It is a bit weird to provide async methods without providing callbacks (even only formally). It may work but is not a very general design.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #1337: [IOTDB-746] Add async insertion interfaces in Session and SessionPool

Posted by GitBox <gi...@apache.org>.
qiaojialin commented on a change in pull request #1337:
URL: https://github.com/apache/incubator-iotdb/pull/1337#discussion_r439222598



##########
File path: session/src/main/java/org/apache/iotdb/session/pool/SessionThreadPool.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.pool;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.iotdb.session.Config;
+
+public class SessionThreadPool {
+
+  private ExecutorService pool;
+  private BlockingQueue<Runnable> threadQueue;
+  private static final int WAIT_TIMEOUT = 10000;

Review comment:
       add a config for this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-iotdb] jt2594838 commented on pull request #1337: [IOTDB-746] Add async insertion interfaces in Session and SessionPool

Posted by GitBox <gi...@apache.org>.
jt2594838 commented on pull request #1337:
URL: https://github.com/apache/incubator-iotdb/pull/1337#issuecomment-641709191


   It is a bit weird to provide async methods without providing callbacks. It may work but is not a very general design.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-iotdb] HTHou commented on a change in pull request #1337: [IOTDB-746] Add async insertion interfaces in Session and SessionPool

Posted by GitBox <gi...@apache.org>.
HTHou commented on a change in pull request #1337:
URL: https://github.com/apache/incubator-iotdb/pull/1337#discussion_r448860998



##########
File path: docs/UserGuide/Client/Programming - Native API.md
##########
@@ -136,11 +145,11 @@ Here we show the commonly used interfaces and their parameters in the Native API
                        List<List<String>> measurementsList, List<List<String>> valuesList)
   ```
   
-* Insert a Record,which contains multiple measurement value of a device at a timestamp. With type info the server has no need to do type inference, which leads a better performance
+* Insert multiple Records asynchronously by providing additional asynchronous timeout and an exception callback

Review comment:
       ```suggestion
   * Insert multiple records asynchronously by providing additional asynchronous timeout and an exception callback
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org