You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/03/23 13:42:24 UTC

[incubator-iotdb] branch master updated: add rpc compression api in client and session module (#930)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d384cb5  add rpc compression api in client and session module (#930)
d384cb5 is described below

commit d384cb5cc5387c9965a7ec31b8239bc28272441c
Author: Ring-k <36...@users.noreply.github.com>
AuthorDate: Mon Mar 23 21:42:14 2020 +0800

    add rpc compression api in client and session module (#930)
    
    * add rpc compression api
---
 .../org/apache/iotdb/client/AbstractClient.java    | 21 +++++++++-----
 .../main/java/org/apache/iotdb/client/Client.java  |  3 ++
 .../java/org/apache/iotdb/client/WinClient.java    |  3 ++
 .../main/java/org/apache/iotdb/SessionExample.java | 20 +++++++------
 .../main/java/org/apache/iotdb/jdbc/Config.java    |  2 +-
 .../java/org/apache/iotdb/session/Session.java     | 33 +++++++++++++---------
 6 files changed, 52 insertions(+), 30 deletions(-)

diff --git a/client/src/main/java/org/apache/iotdb/client/AbstractClient.java b/client/src/main/java/org/apache/iotdb/client/AbstractClient.java
index fc77ade..3da35d7 100644
--- a/client/src/main/java/org/apache/iotdb/client/AbstractClient.java
+++ b/client/src/main/java/org/apache/iotdb/client/AbstractClient.java
@@ -69,6 +69,8 @@ public abstract class AbstractClient {
   static final List<String> AGGREGRATE_TIME_LIST = new ArrayList<>();
   static final String MAX_PRINT_ROW_COUNT_ARGS = "maxPRC";
   private static final String MAX_PRINT_ROW_COUNT_NAME = "maxPrintRowCount";
+  static final String RPC_COMPRESS_ARGS = "c";
+  static final String RPC_COMPRESS_NAME = "rpcCompressed";
   static final String SET_MAX_DISPLAY_NUM = "set max_display_num";
   static final String SET_TIMESTAMP_DISPLAY = "set time_display_type";
   static final String SHOW_TIMESTAMP_DISPLAY = "show time_display_type";
@@ -121,6 +123,7 @@ public abstract class AbstractClient {
     keywordSet.add("-" + EXECUTE_ARGS);
     keywordSet.add("-" + ISO8601_ARGS);
     keywordSet.add("-" + MAX_PRINT_ROW_COUNT_ARGS);
+    keywordSet.add("-" + RPC_COMPRESS_ARGS);
   }
 
 
@@ -129,10 +132,9 @@ public abstract class AbstractClient {
   }
 
   private static void printCount(int cnt) {
-    if(cnt == 0){
+    if (cnt == 0) {
       println("Empty set.");
-    }
-    else {
+    } else {
       println("Total line number = " + cnt);
     }
   }
@@ -175,6 +177,11 @@ public abstract class AbstractClient {
         .argName(MAX_PRINT_ROW_COUNT_NAME).hasArg()
         .desc("Maximum number of rows displayed (optional)").build();
     options.addOption(maxPrintCount);
+
+    Option isRpcCompressed = Option.builder(RPC_COMPRESS_ARGS)
+        .argName(RPC_COMPRESS_NAME)
+        .desc("Rpc Compression enabled or not").build();
+    options.addOption(isRpcCompressed);
     return options;
   }
 
@@ -565,11 +572,11 @@ public abstract class AbstractClient {
   /**
    * cache all results
    *
-   * @param resultSet jdbc resultSet
-   * @param maxSizeList the longest result of every column
-   * @param columnCount the number of column
+   * @param resultSet         jdbc resultSet
+   * @param maxSizeList       the longest result of every column
+   * @param columnCount       the number of column
    * @param resultSetMetaData jdbc resultSetMetaData
-   * @param zoneId your time zone
+   * @param zoneId            your time zone
    * @return List<List < String>> result
    * @throws SQLException throw exception
    */
diff --git a/client/src/main/java/org/apache/iotdb/client/Client.java b/client/src/main/java/org/apache/iotdb/client/Client.java
index 0137e9a..0efcf0c 100644
--- a/client/src/main/java/org/apache/iotdb/client/Client.java
+++ b/client/src/main/java/org/apache/iotdb/client/Client.java
@@ -83,6 +83,9 @@ public class Client extends AbstractClient {
         hf.printHelp(SCRIPT_HINT, options, true);
         return false;
       }
+      if (commandLine.hasOption(RPC_COMPRESS_ARGS)) {
+        Config.rpcThriftCompressionEnable = true;
+      }
       if (commandLine.hasOption(ISO8601_ARGS)) {
         setTimeFormat("long");
       }
diff --git a/client/src/main/java/org/apache/iotdb/client/WinClient.java b/client/src/main/java/org/apache/iotdb/client/WinClient.java
index 0cec7f1..8be3d40 100644
--- a/client/src/main/java/org/apache/iotdb/client/WinClient.java
+++ b/client/src/main/java/org/apache/iotdb/client/WinClient.java
@@ -88,6 +88,9 @@ public class WinClient extends AbstractClient {
         hf.printHelp(IOTDB_CLI_PREFIX, options, true);
         return false;
       }
+      if (commandLine.hasOption(RPC_COMPRESS_ARGS)) {
+        Config.rpcThriftCompressionEnable = true;
+      }
       if (commandLine.hasOption(ISO8601_ARGS)) {
         setTimeFormat("long");
       }
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 5304cde..1ef387a 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -44,17 +44,20 @@ public class SessionExample {
   public static void main(String[] args)
       throws IoTDBConnectionException, StatementExecutionException, BatchExecutionException {
     session = new Session("127.0.0.1", 6667, "root", "root");
-    session.open();
+    session.open(false);
 
     session.setStorageGroup("root.sg1");
     if (session.checkTimeseriesExists("root.sg1.d1.s1")) {
-      session.createTimeseries("root.sg1.d1.s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
+      session.createTimeseries("root.sg1.d1.s1", TSDataType.INT64, TSEncoding.RLE,
+          CompressionType.SNAPPY);
     }
     if (session.checkTimeseriesExists("root.sg1.d1.s2")) {
-      session.createTimeseries("root.sg1.d1.s2", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
+      session.createTimeseries("root.sg1.d1.s2", TSDataType.INT64, TSEncoding.RLE,
+          CompressionType.SNAPPY);
     }
     if (session.checkTimeseriesExists("root.sg1.d1.s3")) {
-      session.createTimeseries("root.sg1.d1.s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
+      session.createTimeseries("root.sg1.d1.s3", TSDataType.INT64, TSEncoding.RLE,
+          CompressionType.SNAPPY);
     }
 
     insert();
@@ -82,7 +85,8 @@ public class SessionExample {
     }
   }
 
-  private static void insertInObject() throws IoTDBConnectionException, StatementExecutionException {
+  private static void insertInObject()
+      throws IoTDBConnectionException, StatementExecutionException {
     String deviceId = "root.sg1.d1";
     List<String> measurements = new ArrayList<>();
     measurements.add("s1");
@@ -127,7 +131,8 @@ public class SessionExample {
   }
 
   /**
-   * insert a batch data of one device, each batch contains multiple timestamps with values of sensors
+   * insert a batch data of one device, each batch contains multiple timestamps with values of
+   * sensors
    *
    * a RowBatch example:
    *
@@ -138,7 +143,6 @@ public class SessionExample {
    * 3,   3,  3,  3
    *
    * Users need to control the count of RowBatch and write a batch when it reaches the maxBatchSize
-   *
    */
   private static void insertRowBatch() throws IoTDBConnectionException, BatchExecutionException {
     // The schema of sensors of one device
@@ -243,7 +247,7 @@ public class SessionExample {
     dataSet = session.executeQueryStatement("select * from root.sg1.d1");
     System.out.println(dataSet.getColumnNames());
     dataSet.setBatchSize(1024); // default is 512
-    while (dataSet.hasNext()){
+    while (dataSet.hasNext()) {
       System.out.println(dataSet.next());
     }
 
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
index 54d52a8..2cd4ff8 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
@@ -52,6 +52,6 @@ public class Config {
 
   public static final String JDBC_DRIVER_NAME = "org.apache.iotdb.jdbc.IoTDBDriver";
 
-  static boolean rpcThriftCompressionEnable = false;
+  public static boolean rpcThriftCompressionEnable = false;
 
 }
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 4570807..fb9b8c6 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -105,6 +105,10 @@ public class Session {
     open(false, Config.DEFAULT_TIMEOUT_MS);
   }
 
+  public synchronized void open(boolean enableRPCCompression) throws IoTDBConnectionException {
+    open(enableRPCCompression, Config.DEFAULT_TIMEOUT_MS);
+  }
+
   private synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
       throws IoTDBConnectionException {
     if (!isClosed) {
@@ -184,11 +188,12 @@ public class Session {
 
   /**
    * check whether the batch has been sorted
+   *
    * @return whether the batch has been sorted
    */
-  private boolean checkSorted(RowBatch rowBatch){
+  private boolean checkSorted(RowBatch rowBatch) {
     for (int i = 1; i < rowBatch.batchSize; i++) {
-      if(rowBatch.timestamps[i] < rowBatch.timestamps[i - 1]){
+      if (rowBatch.timestamps[i] < rowBatch.timestamps[i - 1]) {
         return false;
       }
     }
@@ -222,15 +227,15 @@ public class Session {
   }
 
   /**
-   * use batch interface to insert sorted data
-   * times in row batch must be sorted before!
+   * use batch interface to insert sorted data times in row batch must be sorted before!
    *
    * @param rowBatch data batch
    */
   public void insertSortedBatch(RowBatch rowBatch)
       throws BatchExecutionException, IoTDBConnectionException {
-    if(!checkSorted(rowBatch)){
-      throw new BatchExecutionException("Row batch has't been sorted when calling insertSortedBatch");
+    if (!checkSorted(rowBatch)) {
+      throw new BatchExecutionException(
+          "Row batch has't been sorted when calling insertSortedBatch");
     }
     insertSortedBatchIntern(rowBatch);
   }
@@ -241,22 +246,22 @@ public class Session {
    * @param rowBatchMap data batch in multiple device
    */
   public void insertMultipleDeviceBatch
-      (Map<String, RowBatch> rowBatchMap) throws IoTDBConnectionException, BatchExecutionException {
-    for(Map.Entry<String, RowBatch> dataInOneDevice : rowBatchMap.entrySet()){
+  (Map<String, RowBatch> rowBatchMap) throws IoTDBConnectionException, BatchExecutionException {
+    for (Map.Entry<String, RowBatch> dataInOneDevice : rowBatchMap.entrySet()) {
       sortRowBatch(dataInOneDevice.getValue());
       insertBatch(dataInOneDevice.getValue());
     }
   }
 
   /**
-   * use batch interface to insert sorted data in multiple device
-   * times in row batch must be sorted before!
+   * use batch interface to insert sorted data in multiple device times in row batch must be sorted
+   * before!
    *
    * @param rowBatchMap data batch in multiple device
    */
   public void insertMultipleDeviceSortedBatch
   (Map<String, RowBatch> rowBatchMap) throws IoTDBConnectionException, BatchExecutionException {
-    for(Map.Entry<String, RowBatch> dataInOneDevice : rowBatchMap.entrySet()){
+    for (Map.Entry<String, RowBatch> dataInOneDevice : rowBatchMap.entrySet()) {
       checkSorted(dataInOneDevice.getValue());
       insertSortedBatchIntern(dataInOneDevice.getValue());
     }
@@ -275,7 +280,7 @@ public class Session {
     insertSortedBatchIntern(rowBatch);
   }
 
-  private void sortRowBatch(RowBatch rowBatch){
+  private void sortRowBatch(RowBatch rowBatch) {
     /*
      * following part of code sort the batch data by time,
      * so we can insert continuous data in value list to get a better performance
@@ -297,8 +302,8 @@ public class Session {
    * sort value list by index
    *
    * @param valueList value list
-   * @param dataType data type
-   * @param index index
+   * @param dataType  data type
+   * @param index     index
    * @return sorted list
    */
   private Object sortList(Object valueList, TSDataType dataType, Integer[] index) {