You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/03/23 13:42:24 UTC
[incubator-iotdb] branch master updated: add rpc compression api in
client and session module (#930)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d384cb5 add rpc compression api in client and session module (#930)
d384cb5 is described below
commit d384cb5cc5387c9965a7ec31b8239bc28272441c
Author: Ring-k <36...@users.noreply.github.com>
AuthorDate: Mon Mar 23 21:42:14 2020 +0800
add rpc compression api in client and session module (#930)
* add rpc compression api
---
.../org/apache/iotdb/client/AbstractClient.java | 21 +++++++++-----
.../main/java/org/apache/iotdb/client/Client.java | 3 ++
.../java/org/apache/iotdb/client/WinClient.java | 3 ++
.../main/java/org/apache/iotdb/SessionExample.java | 20 +++++++------
.../main/java/org/apache/iotdb/jdbc/Config.java | 2 +-
.../java/org/apache/iotdb/session/Session.java | 33 +++++++++++++---------
6 files changed, 52 insertions(+), 30 deletions(-)
diff --git a/client/src/main/java/org/apache/iotdb/client/AbstractClient.java b/client/src/main/java/org/apache/iotdb/client/AbstractClient.java
index fc77ade..3da35d7 100644
--- a/client/src/main/java/org/apache/iotdb/client/AbstractClient.java
+++ b/client/src/main/java/org/apache/iotdb/client/AbstractClient.java
@@ -69,6 +69,8 @@ public abstract class AbstractClient {
static final List<String> AGGREGRATE_TIME_LIST = new ArrayList<>();
static final String MAX_PRINT_ROW_COUNT_ARGS = "maxPRC";
private static final String MAX_PRINT_ROW_COUNT_NAME = "maxPrintRowCount";
+ static final String RPC_COMPRESS_ARGS = "c";
+ static final String RPC_COMPRESS_NAME = "rpcCompressed";
static final String SET_MAX_DISPLAY_NUM = "set max_display_num";
static final String SET_TIMESTAMP_DISPLAY = "set time_display_type";
static final String SHOW_TIMESTAMP_DISPLAY = "show time_display_type";
@@ -121,6 +123,7 @@ public abstract class AbstractClient {
keywordSet.add("-" + EXECUTE_ARGS);
keywordSet.add("-" + ISO8601_ARGS);
keywordSet.add("-" + MAX_PRINT_ROW_COUNT_ARGS);
+ keywordSet.add("-" + RPC_COMPRESS_ARGS);
}
@@ -129,10 +132,9 @@ public abstract class AbstractClient {
}
private static void printCount(int cnt) {
- if(cnt == 0){
+ if (cnt == 0) {
println("Empty set.");
- }
- else {
+ } else {
println("Total line number = " + cnt);
}
}
@@ -175,6 +177,11 @@ public abstract class AbstractClient {
.argName(MAX_PRINT_ROW_COUNT_NAME).hasArg()
.desc("Maximum number of rows displayed (optional)").build();
options.addOption(maxPrintCount);
+
+ Option isRpcCompressed = Option.builder(RPC_COMPRESS_ARGS)
+ .argName(RPC_COMPRESS_NAME)
+ .desc("Rpc Compression enabled or not").build();
+ options.addOption(isRpcCompressed);
return options;
}
@@ -565,11 +572,11 @@ public abstract class AbstractClient {
/**
* cache all results
*
- * @param resultSet jdbc resultSet
- * @param maxSizeList the longest result of every column
- * @param columnCount the number of column
+ * @param resultSet jdbc resultSet
+ * @param maxSizeList the longest result of every column
+ * @param columnCount the number of column
* @param resultSetMetaData jdbc resultSetMetaData
- * @param zoneId your time zone
+ * @param zoneId your time zone
* @return List<List < String>> result
* @throws SQLException throw exception
*/
diff --git a/client/src/main/java/org/apache/iotdb/client/Client.java b/client/src/main/java/org/apache/iotdb/client/Client.java
index 0137e9a..0efcf0c 100644
--- a/client/src/main/java/org/apache/iotdb/client/Client.java
+++ b/client/src/main/java/org/apache/iotdb/client/Client.java
@@ -83,6 +83,9 @@ public class Client extends AbstractClient {
hf.printHelp(SCRIPT_HINT, options, true);
return false;
}
+ if (commandLine.hasOption(RPC_COMPRESS_ARGS)) {
+ Config.rpcThriftCompressionEnable = true;
+ }
if (commandLine.hasOption(ISO8601_ARGS)) {
setTimeFormat("long");
}
diff --git a/client/src/main/java/org/apache/iotdb/client/WinClient.java b/client/src/main/java/org/apache/iotdb/client/WinClient.java
index 0cec7f1..8be3d40 100644
--- a/client/src/main/java/org/apache/iotdb/client/WinClient.java
+++ b/client/src/main/java/org/apache/iotdb/client/WinClient.java
@@ -88,6 +88,9 @@ public class WinClient extends AbstractClient {
hf.printHelp(IOTDB_CLI_PREFIX, options, true);
return false;
}
+ if (commandLine.hasOption(RPC_COMPRESS_ARGS)) {
+ Config.rpcThriftCompressionEnable = true;
+ }
if (commandLine.hasOption(ISO8601_ARGS)) {
setTimeFormat("long");
}
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 5304cde..1ef387a 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -44,17 +44,20 @@ public class SessionExample {
public static void main(String[] args)
throws IoTDBConnectionException, StatementExecutionException, BatchExecutionException {
session = new Session("127.0.0.1", 6667, "root", "root");
- session.open();
+ session.open(false);
session.setStorageGroup("root.sg1");
if (session.checkTimeseriesExists("root.sg1.d1.s1")) {
- session.createTimeseries("root.sg1.d1.s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
+ session.createTimeseries("root.sg1.d1.s1", TSDataType.INT64, TSEncoding.RLE,
+ CompressionType.SNAPPY);
}
if (session.checkTimeseriesExists("root.sg1.d1.s2")) {
- session.createTimeseries("root.sg1.d1.s2", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
+ session.createTimeseries("root.sg1.d1.s2", TSDataType.INT64, TSEncoding.RLE,
+ CompressionType.SNAPPY);
}
if (session.checkTimeseriesExists("root.sg1.d1.s3")) {
- session.createTimeseries("root.sg1.d1.s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
+ session.createTimeseries("root.sg1.d1.s3", TSDataType.INT64, TSEncoding.RLE,
+ CompressionType.SNAPPY);
}
insert();
@@ -82,7 +85,8 @@ public class SessionExample {
}
}
- private static void insertInObject() throws IoTDBConnectionException, StatementExecutionException {
+ private static void insertInObject()
+ throws IoTDBConnectionException, StatementExecutionException {
String deviceId = "root.sg1.d1";
List<String> measurements = new ArrayList<>();
measurements.add("s1");
@@ -127,7 +131,8 @@ public class SessionExample {
}
/**
- * insert a batch data of one device, each batch contains multiple timestamps with values of sensors
+ * insert a batch data of one device, each batch contains multiple timestamps with values of
+ * sensors
*
* a RowBatch example:
*
@@ -138,7 +143,6 @@ public class SessionExample {
* 3, 3, 3, 3
*
* Users need to control the count of RowBatch and write a batch when it reaches the maxBatchSize
- *
*/
private static void insertRowBatch() throws IoTDBConnectionException, BatchExecutionException {
// The schema of sensors of one device
@@ -243,7 +247,7 @@ public class SessionExample {
dataSet = session.executeQueryStatement("select * from root.sg1.d1");
System.out.println(dataSet.getColumnNames());
dataSet.setBatchSize(1024); // default is 512
- while (dataSet.hasNext()){
+ while (dataSet.hasNext()) {
System.out.println(dataSet.next());
}
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
index 54d52a8..2cd4ff8 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
@@ -52,6 +52,6 @@ public class Config {
public static final String JDBC_DRIVER_NAME = "org.apache.iotdb.jdbc.IoTDBDriver";
- static boolean rpcThriftCompressionEnable = false;
+ public static boolean rpcThriftCompressionEnable = false;
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 4570807..fb9b8c6 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -105,6 +105,10 @@ public class Session {
open(false, Config.DEFAULT_TIMEOUT_MS);
}
+ public synchronized void open(boolean enableRPCCompression) throws IoTDBConnectionException {
+ open(enableRPCCompression, Config.DEFAULT_TIMEOUT_MS);
+ }
+
private synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
throws IoTDBConnectionException {
if (!isClosed) {
@@ -184,11 +188,12 @@ public class Session {
/**
* check whether the batch has been sorted
+ *
* @return whether the batch has been sorted
*/
- private boolean checkSorted(RowBatch rowBatch){
+ private boolean checkSorted(RowBatch rowBatch) {
for (int i = 1; i < rowBatch.batchSize; i++) {
- if(rowBatch.timestamps[i] < rowBatch.timestamps[i - 1]){
+ if (rowBatch.timestamps[i] < rowBatch.timestamps[i - 1]) {
return false;
}
}
@@ -222,15 +227,15 @@ public class Session {
}
/**
- * use batch interface to insert sorted data
- * times in row batch must be sorted before!
+ * use batch interface to insert sorted data times in row batch must be sorted before!
*
* @param rowBatch data batch
*/
public void insertSortedBatch(RowBatch rowBatch)
throws BatchExecutionException, IoTDBConnectionException {
- if(!checkSorted(rowBatch)){
- throw new BatchExecutionException("Row batch has't been sorted when calling insertSortedBatch");
+ if (!checkSorted(rowBatch)) {
+ throw new BatchExecutionException(
+ "Row batch has't been sorted when calling insertSortedBatch");
}
insertSortedBatchIntern(rowBatch);
}
@@ -241,22 +246,22 @@ public class Session {
* @param rowBatchMap data batch in multiple device
*/
public void insertMultipleDeviceBatch
- (Map<String, RowBatch> rowBatchMap) throws IoTDBConnectionException, BatchExecutionException {
- for(Map.Entry<String, RowBatch> dataInOneDevice : rowBatchMap.entrySet()){
+ (Map<String, RowBatch> rowBatchMap) throws IoTDBConnectionException, BatchExecutionException {
+ for (Map.Entry<String, RowBatch> dataInOneDevice : rowBatchMap.entrySet()) {
sortRowBatch(dataInOneDevice.getValue());
insertBatch(dataInOneDevice.getValue());
}
}
/**
- * use batch interface to insert sorted data in multiple device
- * times in row batch must be sorted before!
+ * use batch interface to insert sorted data in multiple device times in row batch must be sorted
+ * before!
*
* @param rowBatchMap data batch in multiple device
*/
public void insertMultipleDeviceSortedBatch
(Map<String, RowBatch> rowBatchMap) throws IoTDBConnectionException, BatchExecutionException {
- for(Map.Entry<String, RowBatch> dataInOneDevice : rowBatchMap.entrySet()){
+ for (Map.Entry<String, RowBatch> dataInOneDevice : rowBatchMap.entrySet()) {
checkSorted(dataInOneDevice.getValue());
insertSortedBatchIntern(dataInOneDevice.getValue());
}
@@ -275,7 +280,7 @@ public class Session {
insertSortedBatchIntern(rowBatch);
}
- private void sortRowBatch(RowBatch rowBatch){
+ private void sortRowBatch(RowBatch rowBatch) {
/*
* following part of code sort the batch data by time,
* so we can insert continuous data in value list to get a better performance
@@ -297,8 +302,8 @@ public class Session {
* sort value list by index
*
* @param valueList value list
- * @param dataType data type
- * @param index index
+ * @param dataType data type
+ * @param index index
* @return sorted list
*/
private Object sortList(Object valueList, TSDataType dataType, Integer[] index) {