You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/08/19 07:59:30 UTC

[incubator-iotdb] branch master updated: implement rpc compression (#323)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fdc8d28  implement rpc compression (#323)
fdc8d28 is described below

commit fdc8d2833df3b535dc02333d744654f89ee6f5a7
Author: Ring-k <36...@users.noreply.github.com>
AuthorDate: Mon Aug 19 15:59:25 2019 +0800

    implement rpc compression (#323)
    
    * add rpc compression parameter
---
 jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java      |  2 ++
 .../main/java/org/apache/iotdb/jdbc/IoTDBConnection.java  | 15 +++++++++++++--
 .../src/assembly/resources/conf/iotdb-engine.properties   |  2 ++
 .../main/java/org/apache/iotdb/db/conf/IoTDBConfig.java   | 13 +++++++++++++
 .../java/org/apache/iotdb/db/conf/IoTDBDescriptor.java    |  3 +++
 .../java/org/apache/iotdb/db/service/JDBCService.java     | 11 +++++++++--
 .../apache/iotdb/db/sync/receiver/SyncServerManager.java  | 11 +++++++++--
 .../org/apache/iotdb/db/sync/sender/SyncSenderImpl.java   | 10 +++++++++-
 8 files changed, 60 insertions(+), 7 deletions(-)

diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
index 84ed683..763af7d 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
@@ -52,4 +52,6 @@ public class Config {
 
   public static final String JDBC_DRIVER_NAME = "org.apache.iotdb.jdbc.IoTDBDriver";
 
+  public static boolean rpcThriftCompressionEnable = false;
+
 }
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
index e380f5c..c366c0c 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
@@ -57,6 +57,7 @@ import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneResp;
 import org.apache.iotdb.service.rpc.thrift.TS_SessionHandle;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
@@ -89,7 +90,12 @@ public class IoTDBConnection implements Connection {
     supportedProtocols.add(TSProtocolVersion.TSFILE_SERVICE_PROTOCOL_V1);
 
     openTransport();
-    client = new TSIService.Client(new TBinaryProtocol(transport));
+    if(Config.rpcThriftCompressionEnable) {
+      client = new TSIService.Client(new TCompactProtocol(transport));
+    }
+    else {
+      client = new TSIService.Client(new TBinaryProtocol(transport));
+    }
     // open client session
     openSession();
     // Wrap the client with a thread-safe proxy to serialize the RPC calls
@@ -463,7 +469,12 @@ public class IoTDBConnection implements Connection {
         if (transport != null) {
           transport.close();
           openTransport();
-          client = new TSIService.Client(new TBinaryProtocol(transport));
+          if(Config.rpcThriftCompressionEnable) {
+            client = new TSIService.Client(new TCompactProtocol(transport));
+          }
+          else {
+            client = new TSIService.Client(new TBinaryProtocol(transport));
+          }
           openSession();
           client = newSynchronizedClient(client);
           flag = true;
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 7a67eeb..e870573 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -25,6 +25,8 @@ rpc_address=0.0.0.0
 
 rpc_port=6667
 
+rpc_thrift_compression_enable=false
+
 ####################
 ### Dynamic Parameter Adapter Configuration
 ####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 1a98a36..202b0c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -39,6 +39,11 @@ public class IoTDBConfig {
   private String rpcAddress = "0.0.0.0";
 
   /**
+   * whether to use thrift compression.
+   */
+  private boolean rpcThriftCompressionEnable = false;
+
+  /**
    * Port which the JDBC server listens to.
    */
   private int rpcPort = 6667;
@@ -633,6 +638,14 @@ public class IoTDBConfig {
     this.memtableSizeThreshold = memtableSizeThreshold;
   }
 
+  public boolean isRpcThriftCompressionEnable() {
+    return rpcThriftCompressionEnable;
+  }
+
+  public void setRpcThriftCompressionEnable(boolean rpcThriftCompressionEnable) {
+    this.rpcThriftCompressionEnable = rpcThriftCompressionEnable;
+  }
+
   public boolean isMetaDataCacheEnable() {
     return metaDataCacheEnable;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 56e7811..c0b9849 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -120,6 +120,9 @@ public class IoTDBDescriptor {
 
       conf.setRpcAddress(properties.getProperty("rpc_address", conf.getRpcAddress()));
 
+      conf.setRpcThriftCompressionEnable(Boolean.parseBoolean(properties.getProperty("rpc_thrift_compression_enable",
+              Boolean.toString(conf.isRpcThriftCompressionEnable()))));
+
       conf.setRpcPort(Integer.parseInt(properties.getProperty("rpc_port",
           Integer.toString(conf.getRpcPort()))));
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/JDBCService.java b/server/src/main/java/org/apache/iotdb/db/service/JDBCService.java
index 09e7559..132ddeb 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/JDBCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/JDBCService.java
@@ -30,6 +30,8 @@ import org.apache.iotdb.service.rpc.thrift.TSIService;
 import org.apache.iotdb.service.rpc.thrift.TSIService.Processor;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TThreadPoolServer;
 import org.apache.thrift.transport.TServerSocket;
@@ -49,7 +51,7 @@ public class JDBCService implements JDBCServiceMBean, IService {
       .format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE,
           getID().getJmxName());
   private Thread jdbcServiceThread;
-  private Factory protocolFactory;
+  private TProtocolFactory protocolFactory;
   private Processor<TSIService.Iface> processor;
   private TThreadPoolServer.Args poolArgs;
   private TSServiceImpl impl;
@@ -188,7 +190,12 @@ public class JDBCService implements JDBCServiceMBean, IService {
 
     public JDBCServiceThread(CountDownLatch threadStartLatch, CountDownLatch threadStopLatch)
         throws ClassNotFoundException, IllegalAccessException, InstantiationException {
-      protocolFactory = new TBinaryProtocol.Factory();
+      if(IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) {
+        protocolFactory = new TCompactProtocol.Factory();
+      }
+      else {
+        protocolFactory = new TBinaryProtocol.Factory();
+      }
       IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
       impl = (TSServiceImpl) Class.forName(config.getRpcImplClassName()).newInstance();
       processor = new TSIService.Processor<>(impl);
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
index e4e220b..65cea9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
@@ -31,6 +31,8 @@ import org.apache.iotdb.service.sync.thrift.SyncService;
 import org.apache.iotdb.service.sync.thrift.SyncService.Processor;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TThreadPoolServer;
 import org.apache.thrift.transport.TServerSocket;
@@ -98,7 +100,7 @@ public class SyncServerManager implements IService {
 
     private TServerSocket serverTransport;
     private TServer poolServer;
-    private Factory protocolFactory;
+    private TProtocolFactory protocolFactory;
     private Processor<SyncService.Iface> processor;
     private TThreadPoolServer.Args poolArgs;
 
@@ -111,7 +113,12 @@ public class SyncServerManager implements IService {
       try {
         serverTransport = new TServerSocket(
             new InetSocketAddress(conf.getRpcAddress(), conf.getSyncServerPort()));
-        protocolFactory = new TBinaryProtocol.Factory();
+        if(conf.isRpcThriftCompressionEnable()) {
+          protocolFactory = new TCompactProtocol.Factory();
+        }
+        else {
+          protocolFactory = new TBinaryProtocol.Factory();
+        }
         processor = new SyncService.Processor<>(new SyncServiceImpl());
         poolArgs = new TThreadPoolServer.Args(serverTransport);
         poolArgs.executorService = IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs,
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/SyncSenderImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/SyncSenderImpl.java
index 88d6705..a340730 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/SyncSenderImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/SyncSenderImpl.java
@@ -46,6 +46,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.SyncConnectionException;
 import org.apache.iotdb.db.sync.conf.Constans;
 import org.apache.iotdb.db.sync.conf.SyncSenderConfig;
@@ -55,6 +56,7 @@ import org.apache.iotdb.service.sync.thrift.SyncDataStatus;
 import org.apache.iotdb.service.sync.thrift.SyncService;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
@@ -265,7 +267,13 @@ public class SyncSenderImpl implements SyncSender {
   @Override
   public void establishConnection(String serverIp, int serverPort) throws SyncConnectionException {
     transport = new TSocket(serverIp, serverPort);
-    TProtocol protocol = new TBinaryProtocol(transport);
+    TProtocol protocol;
+    if(IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) {
+      protocol = new TCompactProtocol(transport);
+    }
+    else {
+      protocol = new TBinaryProtocol(transport);
+    }
     serviceClient = new SyncService.Client(protocol);
     try {
       transport.open();