You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/08/19 07:59:30 UTC
[incubator-iotdb] branch master updated: implement rpc compression
(#323)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fdc8d28 implement rpc compression (#323)
fdc8d28 is described below
commit fdc8d2833df3b535dc02333d744654f89ee6f5a7
Author: Ring-k <36...@users.noreply.github.com>
AuthorDate: Mon Aug 19 15:59:25 2019 +0800
implement rpc compression (#323)
* add rpc compression parameter
---
jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java | 2 ++
.../main/java/org/apache/iotdb/jdbc/IoTDBConnection.java | 15 +++++++++++++--
.../src/assembly/resources/conf/iotdb-engine.properties | 2 ++
.../main/java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 +++++++++++++
.../java/org/apache/iotdb/db/conf/IoTDBDescriptor.java | 3 +++
.../java/org/apache/iotdb/db/service/JDBCService.java | 11 +++++++++--
.../apache/iotdb/db/sync/receiver/SyncServerManager.java | 11 +++++++++--
.../org/apache/iotdb/db/sync/sender/SyncSenderImpl.java | 10 +++++++++-
8 files changed, 60 insertions(+), 7 deletions(-)
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
index 84ed683..763af7d 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
@@ -52,4 +52,6 @@ public class Config {
public static final String JDBC_DRIVER_NAME = "org.apache.iotdb.jdbc.IoTDBDriver";
+ public static boolean rpcThriftCompressionEnable = false;
+
}
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
index e380f5c..c366c0c 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
@@ -57,6 +57,7 @@ import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneResp;
import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
@@ -89,7 +90,12 @@ public class IoTDBConnection implements Connection {
supportedProtocols.add(TSProtocolVersion.TSFILE_SERVICE_PROTOCOL_V1);
openTransport();
- client = new TSIService.Client(new TBinaryProtocol(transport));
+ if(Config.rpcThriftCompressionEnable) {
+ client = new TSIService.Client(new TCompactProtocol(transport));
+ }
+ else {
+ client = new TSIService.Client(new TBinaryProtocol(transport));
+ }
// open client session
openSession();
// Wrap the client with a thread-safe proxy to serialize the RPC calls
@@ -463,7 +469,12 @@ public class IoTDBConnection implements Connection {
if (transport != null) {
transport.close();
openTransport();
- client = new TSIService.Client(new TBinaryProtocol(transport));
+ if(Config.rpcThriftCompressionEnable) {
+ client = new TSIService.Client(new TCompactProtocol(transport));
+ }
+ else {
+ client = new TSIService.Client(new TBinaryProtocol(transport));
+ }
openSession();
client = newSynchronizedClient(client);
flag = true;
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 7a67eeb..e870573 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -25,6 +25,8 @@ rpc_address=0.0.0.0
rpc_port=6667
+rpc_thrift_compression_enable=false
+
####################
### Dynamic Parameter Adapter Configuration
####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 1a98a36..202b0c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -39,6 +39,11 @@ public class IoTDBConfig {
private String rpcAddress = "0.0.0.0";
/**
+ * whether to use thrift compression.
+ */
+ private boolean rpcThriftCompressionEnable = false;
+
+ /**
* Port which the JDBC server listens to.
*/
private int rpcPort = 6667;
@@ -633,6 +638,14 @@ public class IoTDBConfig {
this.memtableSizeThreshold = memtableSizeThreshold;
}
+ public boolean isRpcThriftCompressionEnable() {
+ return rpcThriftCompressionEnable;
+ }
+
+ public void setRpcThriftCompressionEnable(boolean rpcThriftCompressionEnable) {
+ this.rpcThriftCompressionEnable = rpcThriftCompressionEnable;
+ }
+
public boolean isMetaDataCacheEnable() {
return metaDataCacheEnable;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 56e7811..c0b9849 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -120,6 +120,9 @@ public class IoTDBDescriptor {
conf.setRpcAddress(properties.getProperty("rpc_address", conf.getRpcAddress()));
+ conf.setRpcThriftCompressionEnable(Boolean.parseBoolean(properties.getProperty("rpc_thrift_compression_enable",
+ Boolean.toString(conf.isRpcThriftCompressionEnable()))));
+
conf.setRpcPort(Integer.parseInt(properties.getProperty("rpc_port",
Integer.toString(conf.getRpcPort()))));
diff --git a/server/src/main/java/org/apache/iotdb/db/service/JDBCService.java b/server/src/main/java/org/apache/iotdb/db/service/JDBCService.java
index 09e7559..132ddeb 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/JDBCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/JDBCService.java
@@ -30,6 +30,8 @@ import org.apache.iotdb.service.rpc.thrift.TSIService;
import org.apache.iotdb.service.rpc.thrift.TSIService.Processor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
@@ -49,7 +51,7 @@ public class JDBCService implements JDBCServiceMBean, IService {
.format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE,
getID().getJmxName());
private Thread jdbcServiceThread;
- private Factory protocolFactory;
+ private TProtocolFactory protocolFactory;
private Processor<TSIService.Iface> processor;
private TThreadPoolServer.Args poolArgs;
private TSServiceImpl impl;
@@ -188,7 +190,12 @@ public class JDBCService implements JDBCServiceMBean, IService {
public JDBCServiceThread(CountDownLatch threadStartLatch, CountDownLatch threadStopLatch)
throws ClassNotFoundException, IllegalAccessException, InstantiationException {
- protocolFactory = new TBinaryProtocol.Factory();
+ if(IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) {
+ protocolFactory = new TCompactProtocol.Factory();
+ }
+ else {
+ protocolFactory = new TBinaryProtocol.Factory();
+ }
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
impl = (TSServiceImpl) Class.forName(config.getRpcImplClassName()).newInstance();
processor = new TSIService.Processor<>(impl);
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
index e4e220b..65cea9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
@@ -31,6 +31,8 @@ import org.apache.iotdb.service.sync.thrift.SyncService;
import org.apache.iotdb.service.sync.thrift.SyncService.Processor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
@@ -98,7 +100,7 @@ public class SyncServerManager implements IService {
private TServerSocket serverTransport;
private TServer poolServer;
- private Factory protocolFactory;
+ private TProtocolFactory protocolFactory;
private Processor<SyncService.Iface> processor;
private TThreadPoolServer.Args poolArgs;
@@ -111,7 +113,12 @@ public class SyncServerManager implements IService {
try {
serverTransport = new TServerSocket(
new InetSocketAddress(conf.getRpcAddress(), conf.getSyncServerPort()));
- protocolFactory = new TBinaryProtocol.Factory();
+ if(conf.isRpcThriftCompressionEnable()) {
+ protocolFactory = new TCompactProtocol.Factory();
+ }
+ else {
+ protocolFactory = new TBinaryProtocol.Factory();
+ }
processor = new SyncService.Processor<>(new SyncServiceImpl());
poolArgs = new TThreadPoolServer.Args(serverTransport);
poolArgs.executorService = IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs,
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/SyncSenderImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/SyncSenderImpl.java
index 88d6705..a340730 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/SyncSenderImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/SyncSenderImpl.java
@@ -46,6 +46,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.SyncConnectionException;
import org.apache.iotdb.db.sync.conf.Constans;
import org.apache.iotdb.db.sync.conf.SyncSenderConfig;
@@ -55,6 +56,7 @@ import org.apache.iotdb.service.sync.thrift.SyncDataStatus;
import org.apache.iotdb.service.sync.thrift.SyncService;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
@@ -265,7 +267,13 @@ public class SyncSenderImpl implements SyncSender {
@Override
public void establishConnection(String serverIp, int serverPort) throws SyncConnectionException {
transport = new TSocket(serverIp, serverPort);
- TProtocol protocol = new TBinaryProtocol(transport);
+ TProtocol protocol;
+ if(IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) {
+ protocol = new TCompactProtocol(transport);
+ }
+ else {
+ protocol = new TBinaryProtocol(transport);
+ }
serviceClient = new SyncService.Client(protocol);
try {
transport.open();