You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/12/01 06:53:00 UTC

[iotdb] branch cluster_add_snappy updated: refactor transport factory

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cluster_add_snappy
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/cluster_add_snappy by this push:
     new 0a42b77  refactor transport factory
0a42b77 is described below

commit 0a42b7716cf4dc99eb246505db11aee9e5fc4208
Author: jt <jt...@163.com>
AuthorDate: Tue Dec 1 14:51:31 2020 +0800

    refactor transport factory
---
 cluster/src/assembly/resources/conf/logback.xml    |  9 ----
 .../java/org/apache/iotdb/cluster/ClientMain.java  |  4 +-
 .../iotdb/cluster/client/sync/SyncDataClient.java  |  9 ++--
 .../client/sync/SyncDataHeartbeatClient.java       |  4 +-
 .../iotdb/cluster/client/sync/SyncMetaClient.java  |  4 +-
 .../client/sync/SyncMetaHeartbeatClient.java       |  4 +-
 .../apache/iotdb/cluster/server/ClientServer.java  |  6 +--
 .../apache/iotdb/cluster/server/RaftServer.java    |  6 +--
 .../cluster/server/heartbeat/HeartbeatServer.java  |  7 +--
 .../iotdb/cluster/server/member/RaftMember.java    |  2 +-
 .../apache/iotdb/cluster/utils/ClusterUtils.java   |  7 +--
 .../TimeoutChangeableTFastFramedTransportTest.java | 24 ----------
 .../org/apache/iotdb/jdbc/IoTDBConnection.java     | 16 +++++--
 .../db/service/thrift/ThriftServiceThread.java     |  4 +-
 .../iotdb/db/sync/sender/transfer/SyncClient.java  |  5 +-
 .../org/apache/iotdb/rpc/AutoExpandingBuffer.java  | 53 ++++++++++++++++++++++
 .../iotdb/rpc}/AutoScalingBufferReadTransport.java | 21 +++------
 .../rpc}/AutoScalingBufferWriteTransport.java      | 26 ++++-------
 .../org/apache/iotdb/rpc/RpcTransportFactory.java  | 34 ++++++++------
 .../apache/iotdb/rpc}/TElasticFramedTransport.java | 10 ++--
 .../iotdb/rpc/TSnappyElasticFramedTransport.java   | 18 ++++----
 .../TimeoutChangeableTFastFramedTransport.java     | 16 +++++--
 .../TimeoutChangeableTSnappyFramedTransport.java   | 17 +++++--
 .../iotdb/rpc/TimeoutChangeableTransport.java      | 20 +++-----
 .../apache/iotdb/session/SessionConnection.java    |  4 +-
 25 files changed, 173 insertions(+), 157 deletions(-)

diff --git a/cluster/src/assembly/resources/conf/logback.xml b/cluster/src/assembly/resources/conf/logback.xml
index 4b641f5..e81e6e5 100644
--- a/cluster/src/assembly/resources/conf/logback.xml
+++ b/cluster/src/assembly/resources/conf/logback.xml
@@ -27,7 +27,6 @@
             <onMismatch>DENY</onMismatch>
         </filter>
     </appender>
-
     <appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="FILE_WARN">
         <file>${IOTDB_HOME}/logs/log_warn.log</file>
         <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
@@ -49,7 +48,6 @@
             <onMismatch>DENY</onMismatch>
         </filter>
     </appender>
-
     <appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="FILE_INFO">
         <file>${IOTDB_HOME}/logs/log_info.log</file>
         <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
@@ -71,7 +69,6 @@
             <onMismatch>DENY</onMismatch>
         </filter>
     </appender>
-
     <appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="FILE_DEBUG">
         <file>${IOTDB_HOME}/logs/log_debug.log</file>
         <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
@@ -93,7 +90,6 @@
             <onMismatch>DENY</onMismatch>
         </filter>
     </appender>
-
     <appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="FILE_TRACE">
         <file>${IOTDB_HOME}/logs/log_trace.log</file>
         <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
@@ -115,7 +111,6 @@
             <onMismatch>DENY</onMismatch>
         </filter>
     </appender>
-
     <appender class="ch.qos.logback.core.ConsoleAppender" name="FILE_STDOUT">
         <Target>System.out</Target>
         <encoder>
@@ -126,7 +121,6 @@
             <level>INFO</level>
         </filter>
     </appender>
-
     <logger level="info" name="org.apache.iotdb.db.service"/>
     <logger level="info" name="org.apache.iotdb.db.conf"/>
     <!-- a log appender that collect all log records whose level is greather than debug-->
@@ -149,7 +143,6 @@
             <level>DEBUG</level>
         </filter>
     </appender>
-
     <appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="FILE_COST_MEASURE">
         <file>${IOTDB_HOME}/logs/log_measure.log</file>
         <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
@@ -169,7 +162,6 @@
             <level>INFO</level>
         </filter>
     </appender>
-
     <appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="FILE_DYNAMIC_PARAMETER">
         <file>${IOTDB_HOME}/logs/log_dynamic_adapter.log</file>
         <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
@@ -189,7 +181,6 @@
             <level>DEBUG</level>
         </filter>
     </appender>
-
     <appender name="ASYNC_FILE_ERROR" class="ch.qos.logback.classic.AsyncAppender">
         <queueSize>1024</queueSize>
         <neverBlock>true</neverBlock>
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
index c339344..5da4e52 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
@@ -36,11 +36,11 @@ import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
-import org.apache.iotdb.cluster.client.rpcutils.TElasticFramedTransport;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.jdbc.Config;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
@@ -280,7 +280,7 @@ public class ClientMain {
   @SuppressWarnings({"java:S2095"}) // the transport is used later
   private static Client getClient(String ip, int port) throws TTransportException {
     TSIService.Client.Factory factory = new Factory();
-    TTransport transport = new TElasticFramedTransport(new TSocket(ip, port));
+    TTransport transport = RpcTransportFactory.INSTANCE.getTransport(new TSocket(ip, port));
     transport.open();
     TProtocol protocol =
         IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable() ?
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
index 9dd4f48..23e084c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
@@ -20,11 +20,12 @@
 package org.apache.iotdb.cluster.client.sync;
 
 import java.net.SocketException;
-import org.apache.iotdb.cluster.client.rpcutils.TimeoutChangeableTFastFramedTransport;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.TSDataService.Client;
 import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TimeoutChangeableTransport;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TSocket;
@@ -48,7 +49,7 @@ public class SyncDataClient extends Client {
   public SyncDataClient(TProtocolFactory protocolFactory, Node node, SyncClientPool pool)
       throws TTransportException {
     // the difference of the two clients lies in the port
-    super(protocolFactory.getProtocol(new TimeoutChangeableTFastFramedTransport(
+    super(protocolFactory.getProtocol(RpcTransportFactory.INSTANCE.getTransport(
         new TSocket(node.getIp(), node.getDataPort(), RaftServer.getConnectionTimeoutInMS()))));
     this.node = node;
     this.pool = pool;
@@ -57,13 +58,13 @@ public class SyncDataClient extends Client {
 
   public void setTimeout(int timeout) {
     // the same transport is used in both input and output
-    ((TimeoutChangeableTFastFramedTransport) (getInputProtocol().getTransport()))
+    ((TimeoutChangeableTransport) (getInputProtocol().getTransport()))
         .setTimeout(timeout);
   }
 
   @TestOnly
   public int getTimeout() throws SocketException {
-    return ((TimeoutChangeableTFastFramedTransport) getInputProtocol().getTransport()).getTimeOut();
+    return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
   }
 
   public void putBack() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
index d9b40ad..134ec11 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
@@ -19,10 +19,10 @@
 
 package org.apache.iotdb.cluster.client.sync;
 
-import org.apache.iotdb.cluster.client.rpcutils.TimeoutChangeableTFastFramedTransport;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
@@ -36,7 +36,7 @@ public class SyncDataHeartbeatClient extends SyncDataClient {
   private SyncDataHeartbeatClient(TProtocolFactory protocolFactory, Node node, SyncClientPool pool)
       throws TTransportException {
     // the difference of the two clients lies in the port
-    super(protocolFactory.getProtocol(new TimeoutChangeableTFastFramedTransport(
+    super(protocolFactory.getProtocol(RpcTransportFactory.INSTANCE.getTransport(
         new TSocket(node.getIp(), node.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
             RaftServer.getConnectionTimeoutInMS()))));
     this.node = node;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
index 7cec0f0..6e4d450 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
@@ -19,10 +19,10 @@
 
 package org.apache.iotdb.cluster.client.sync;
 
-import org.apache.iotdb.cluster.client.rpcutils.TElasticFramedTransport;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Client;
 import org.apache.iotdb.cluster.server.RaftServer;
+import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TSocket;
@@ -45,7 +45,7 @@ public class SyncMetaClient extends Client {
 
   public SyncMetaClient(TProtocolFactory protocolFactory, Node node, SyncClientPool pool)
       throws TTransportException {
-    super(protocolFactory.getProtocol(new TElasticFramedTransport(
+    super(protocolFactory.getProtocol(RpcTransportFactory.INSTANCE.getTransport(
         new TSocket(node.getIp(), node.getMetaPort(), RaftServer.getConnectionTimeoutInMS()))));
     this.node = node;
     this.pool = pool;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
index 4326cf9..dddc66f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
@@ -19,10 +19,10 @@
 
 package org.apache.iotdb.cluster.client.sync;
 
-import org.apache.iotdb.cluster.client.rpcutils.TimeoutChangeableTFastFramedTransport;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
@@ -36,7 +36,7 @@ public class SyncMetaHeartbeatClient extends SyncMetaClient {
   private SyncMetaHeartbeatClient(TProtocolFactory protocolFactory, Node node, SyncClientPool pool)
       throws TTransportException {
     // the difference of the two clients lies in the port
-    super(protocolFactory.getProtocol(new TimeoutChangeableTFastFramedTransport(
+    super(protocolFactory.getProtocol(RpcTransportFactory.INSTANCE.getTransport(
         new TSocket(node.getIp(), node.getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET,
             RaftServer.getConnectionTimeoutInMS()))));
     this.node = node;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
index 4c82c5b..f0c554e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
@@ -34,7 +34,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.iotdb.cluster.client.async.AsyncDataClient;
-import org.apache.iotdb.cluster.client.rpcutils.TElasticFramedTransport;
 import org.apache.iotdb.cluster.client.sync.SyncDataClient;
 import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
@@ -55,6 +54,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.service.TSServiceImpl;
 import org.apache.iotdb.db.utils.CommonUtils;
+import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSIService.Processor;
@@ -164,9 +164,7 @@ public class ClientServer extends TSServiceImpl {
     poolArgs.processor(new Processor<>(this));
     poolArgs.protocolFactory(protocolFactory);
     // nonblocking server requests FramedTransport
-    poolArgs.transportFactory(new TElasticFramedTransport.Factory(
-        IoTDBDescriptor.getInstance().getConfig().getThriftInitBufferSize(),
-        IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize()));
+    poolArgs.transportFactory(RpcTransportFactory.INSTANCE);
 
     poolServer = new TThreadPoolServer(poolArgs);
     // mainly for handling client exit events
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
index 0f2363e..fe0cc63 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
@@ -25,7 +25,6 @@ import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.cluster.client.rpcutils.TElasticFramedTransport;
 import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -35,6 +34,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.utils.CommonUtils;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
@@ -202,9 +202,7 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService.
     poolArgs.processor(getProcessor());
     poolArgs.protocolFactory(protocolFactory);
     // async service requires FramedTransport
-    poolArgs.transportFactory(new TElasticFramedTransport.Factory(
-        IoTDBDescriptor.getInstance().getConfig().getThriftInitBufferSize(),
-        IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize()));
+    poolArgs.transportFactory(RpcTransportFactory.INSTANCE);
 
     // run the thrift server in a separate thread so that the main thread is not blocked
     return new TThreadedSelectorServer(poolArgs);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatServer.java
index f3d494f..e236e1c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatServer.java
@@ -25,14 +25,13 @@ import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.cluster.client.rpcutils.TElasticFramedTransport;
 import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.utils.CommonUtils;
+import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
@@ -190,9 +189,7 @@ public abstract class HeartbeatServer {
     poolArgs.processor(getProcessor());
     poolArgs.protocolFactory(heartbeatProtocolFactory);
     // async service requires FramedTransport
-    poolArgs.transportFactory(new TElasticFramedTransport.Factory(
-        IoTDBDescriptor.getInstance().getConfig().getThriftInitBufferSize(),
-        IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize()));
+    poolArgs.transportFactory(RpcTransportFactory.INSTANCE);
 
     return new THsHaServer(poolArgs);
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 555c0a3..245b135 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -107,7 +107,7 @@ import org.slf4j.LoggerFactory;
 @SuppressWarnings("java:S3077") // reference volatile is enough
 public abstract class RaftMember {
 
-  public static final boolean USE_LOG_DISPATCHER = true;
+  public static final boolean USE_LOG_DISPATCHER = false;
 
   private static final String MSG_FORWARD_TIMEOUT = "{}: Forward {} to {} time out";
   private static final String MSG_FORWARD_ERROR = "{}: encountered an error when forwarding {} to"
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
index 8346ddc..395ce03 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
@@ -27,7 +27,6 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.cluster.client.rpcutils.TElasticFramedTransport;
 import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.CheckConsistencyException;
@@ -37,11 +36,11 @@ import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.utils.CommonUtils;
+import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.server.TServer;
@@ -253,9 +252,7 @@ public class ClusterUtils {
     poolArgs.processor(processor);
     poolArgs.protocolFactory(protocolFactory);
     // async service requires FramedTransport
-    poolArgs.transportFactory(new TElasticFramedTransport.Factory(
-        IoTDBDescriptor.getInstance().getConfig().getThriftInitBufferSize(),
-        IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize()));
+    poolArgs.transportFactory(RpcTransportFactory.INSTANCE);
 
     // run the thrift server in a separate thread so that the main thread is not blocked
     return new TThreadPoolServer(poolArgs);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/rpcutils/TimeoutChangeableTFastFramedTransportTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/rpcutils/TimeoutChangeableTFastFramedTransportTest.java
deleted file mode 100644
index 3b00a88..0000000
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/rpcutils/TimeoutChangeableTFastFramedTransportTest.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- *   * Licensed to the Apache Software Foundation (ASF) under one  * or more contributor license agreements.  See the NOTICE file  * distributed with this work for additional information  * regarding copyright ownership.  The ASF licenses this file  * to you under the Apache License, Version 2.0 (the  * "License"); you may not use this file except in compliance  * with the License.  You may obtain a copy of the License at  *  *     http://www.apache.org/licenses/LICENSE-2.0  *  * Unless  [...]
- */
-
-package org.apache.iotdb.cluster.client.rpcutils;
-
-import static org.junit.Assert.*;
-
-import java.net.SocketException;
-import org.apache.thrift.transport.TSocket;
-import org.junit.Test;
-
-public class TimeoutChangeableTFastFramedTransportTest {
-
-  @Test
-  public void test() throws SocketException {
-    TimeoutChangeableTFastFramedTransport transport;
-    transport = new TimeoutChangeableTFastFramedTransport(new TSocket("localhost", 6667));
-
-    assertEquals(0, transport.getTimeOut());
-    transport.setTimeout(1000);
-    assertEquals(1000, transport.getTimeOut());
-  }
-}
\ No newline at end of file
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
index 85464cd..1f954e3 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
@@ -38,13 +38,20 @@ import java.time.ZoneId;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Executor;
+import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.service.rpc.thrift.*;
+import org.apache.iotdb.service.rpc.thrift.ServerProperties;
+import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
+import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.transport.TFastFramedTransport;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
@@ -407,8 +414,9 @@ public class IoTDBConnection implements Connection {
   }
 
   private void openTransport() throws TTransportException {
-    transport = new TFastFramedTransport(new TSocket(params.getHost(), params.getPort(),
-        Config.connectionTimeoutInMs));
+    transport = RpcTransportFactory.INSTANCE
+        .getTransport(new TSocket(params.getHost(), params.getPort(),
+            Config.connectionTimeoutInMs));
     if (!transport.isOpen()) {
       transport.open();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
index 7f650d3..51fabac 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.exception.runtime.RPCServiceException;
 import org.apache.iotdb.db.utils.CommonUtils;
+import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
@@ -32,7 +33,6 @@ import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TServerEventHandler;
 import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TFastFramedTransport;
 import org.apache.thrift.transport.TServerSocket;
 import org.apache.thrift.transport.TServerTransport;
 import org.apache.thrift.transport.TTransportException;
@@ -73,7 +73,7 @@ public class ThriftServiceThread extends Thread {
           threadsName);
       poolArgs.processor(processor);
       poolArgs.protocolFactory(protocolFactory);
-      poolArgs.transportFactory(new TFastFramedTransport.Factory());
+      poolArgs.transportFactory(RpcTransportFactory.INSTANCE);
       poolServer = new TThreadPoolServer(poolArgs);
       poolServer.setServerEventHandler(serverEventHandler);
     } catch (TTransportException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
index a0e16ff..8da4491 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
@@ -68,6 +68,7 @@ import org.apache.iotdb.db.sync.sender.recover.ISyncSenderLogger;
 import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogAnalyzer;
 import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogger;
 import org.apache.iotdb.db.utils.SyncUtils;
+import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.service.sync.thrift.ConfirmInfo;
 import org.apache.iotdb.service.sync.thrift.SyncService;
 import org.apache.iotdb.service.sync.thrift.SyncStatus;
@@ -75,7 +76,6 @@ import org.apache.iotdb.tsfile.utils.BytesUtils;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TFastFramedTransport;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
@@ -273,7 +273,8 @@ public class SyncClient implements ISyncClient {
 
   @Override
   public void establishConnection(String serverIp, int serverPort) throws SyncConnectionException {
-    transport = new TFastFramedTransport(new TSocket(serverIp, serverPort, TIMEOUT_MS));
+    transport = RpcTransportFactory.INSTANCE
+        .getTransport(new TSocket(serverIp, serverPort, TIMEOUT_MS));
     TProtocol protocol = new TBinaryProtocol(transport);
     serviceClient = new SyncService.Client(protocol);
     try {
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoExpandingBuffer.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoExpandingBuffer.java
new file mode 100644
index 0000000..52490eb
--- /dev/null
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoExpandingBuffer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.rpc;
+
+import java.util.Arrays;
+
+/**
+ * Helper class that wraps a byte[] so that it can expand and be reused. Users
+ * should call resizeIfNecessary to make sure the buffer has suitable capacity,
+ * and then use the array as needed. Note that the internal array will grow at a
+ * rate slightly faster than the requested capacity with the (untested)
+ * objective of avoiding expensive buffer allocations and copies.
+ */
+class AutoExpandingBuffer {
+  private byte[] array;
+
+  public AutoExpandingBuffer(int initialCapacity) {
+    this.array = new byte[initialCapacity];
+  }
+
+  public void resizeIfNecessary(int size) {
+    final int currentCapacity = this.array.length;
+    final double loadFactor = 0.3;
+    if (currentCapacity < size) {
+      // Increase by a factor of 1.5x
+      int growCapacity = currentCapacity + (currentCapacity >> 1);
+      int newCapacity = Math.max(growCapacity, size);
+      this.array = Arrays.copyOf(array, newCapacity);
+    } else if (array.length * loadFactor > size) {
+      array = Arrays.copyOf(array, size);
+    }
+  }
+
+  public byte[] array() {
+    return this.array;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/AutoScalingBufferReadTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferReadTransport.java
similarity index 64%
rename from cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/AutoScalingBufferReadTransport.java
rename to service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferReadTransport.java
index 7a564e9..085e9e8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/AutoScalingBufferReadTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferReadTransport.java
@@ -17,29 +17,20 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.client.rpcutils;
+package org.apache.iotdb.rpc;
 
 import org.apache.thrift.transport.AutoExpandingBufferReadTransport;
 
 public class AutoScalingBufferReadTransport extends AutoExpandingBufferReadTransport {
 
-  private final AutoScalingBuffer buf;
+  private final AutoExpandingBuffer buf;
 
-  public AutoScalingBufferReadTransport(int initialCapacity, double overgrowthCoefficient) {
-    super(initialCapacity, overgrowthCoefficient);
-    this.buf = new AutoScalingBuffer(initialCapacity, overgrowthCoefficient);
+  public AutoScalingBufferReadTransport(int initialCapacity) {
+    super(initialCapacity);
+    this.buf = new AutoExpandingBuffer(initialCapacity);
   }
 
-  /**
-   * shrink the buffer to the specific size
-   *
-   * @param size The size of the target you want to shrink to
-   */
-  public void shrinkSizeIfNecessary(int size) {
-    buf.shrinkSizeIfNecessary(size);
-  }
-
-  public void expandIfNecessary(int size) {
+  public void resizeIfNecessary(int size) {
     buf.resizeIfNecessary(size);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/AutoScalingBufferWriteTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferWriteTransport.java
similarity index 78%
rename from cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/AutoScalingBufferWriteTransport.java
rename to service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferWriteTransport.java
index f7024e6..26807fb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/AutoScalingBufferWriteTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferWriteTransport.java
@@ -17,9 +17,8 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.client.rpcutils;
+package org.apache.iotdb.rpc;
 
-import org.apache.thrift.transport.AutoExpandingBuffer;
 import org.apache.thrift.transport.TTransport;
 
 /**
@@ -28,11 +27,11 @@ import org.apache.thrift.transport.TTransport;
  */
 public class AutoScalingBufferWriteTransport extends TTransport {
 
-  private final AutoScalingBuffer buf;
+  private final AutoExpandingBuffer buf;
   private int pos;
 
-  public AutoScalingBufferWriteTransport(int initialCapacity, double growthCoefficient) {
-    this.buf = new AutoScalingBuffer(initialCapacity, growthCoefficient);
+  public AutoScalingBufferWriteTransport(int initialCapacity) {
+    this.buf = new AutoExpandingBuffer(initialCapacity);
     this.pos = 0;
   }
 
@@ -63,10 +62,6 @@ public class AutoScalingBufferWriteTransport extends TTransport {
     pos += len;
   }
 
-  public AutoExpandingBuffer getBuf() {
-    return buf;
-  }
-
   public int getPos() {
     return pos;
   }
@@ -75,12 +70,11 @@ public class AutoScalingBufferWriteTransport extends TTransport {
     pos = 0;
   }
 
-  /**
-   * shrink the buffer to the specific size
-   *
-   * @param size The size of the target you want to shrink to
-   */
-  public void shrinkSizeIfNecessary(int size) {
-    buf.shrinkSizeIfNecessary(size);
+  public void resizeIfNecessary(int size) {
+    buf.resizeIfNecessary(size);
+  }
+
+  public AutoExpandingBuffer getBuf() {
+    return buf;
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TimeoutChangeableTFastFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
similarity index 51%
copy from cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TimeoutChangeableTFastFramedTransport.java
copy to service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
index f4d9a95..6a30b7a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TimeoutChangeableTFastFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
@@ -17,27 +17,31 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.client.rpcutils;
+package org.apache.iotdb.rpc;
 
-import java.net.SocketException;
-import org.apache.iotdb.db.utils.TestOnly;
-import org.apache.thrift.transport.TSocket;
+import org.apache.iotdb.rpc.TimeoutChangeableTFastFramedTransport.Factory;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportFactory;
 
-public class TimeoutChangeableTFastFramedTransport extends TElasticFramedTransport {
+public class RpcTransportFactory extends TTransportFactory {
 
-  private TSocket underlying;
-
-  public TimeoutChangeableTFastFramedTransport(TSocket underlying) {
-    super(underlying);
-    this.underlying = underlying;
+  // TODO: make it a config
+  public static final boolean USE_SNAPPY = true;
+  public static final RpcTransportFactory INSTANCE;
+  static {
+    INSTANCE = USE_SNAPPY ?
+        new RpcTransportFactory(new TimeoutChangeableTSnappyFramedTransport.Factory()) :
+        new RpcTransportFactory(new Factory());
   }
 
-  public void setTimeout(int timeout) {
-    underlying.setTimeout(timeout);
+  private TTransportFactory inner;
+
+  public RpcTransportFactory(TTransportFactory inner) {
+    this.inner = inner;
   }
 
-  @TestOnly
-  public int getTimeOut() throws SocketException {
-    return underlying.getSocket().getSoTimeout();
+  @Override
+  public TTransport getTransport(TTransport trans) {
+    return inner.getTransport(trans);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
similarity index 96%
copy from cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TElasticFramedTransport.java
copy to service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
index 7ad4a8c..3fddcf1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.client.rpcutils;
+package org.apache.iotdb.rpc;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -65,8 +65,8 @@ public class TElasticFramedTransport extends TFastFramedTransport {
     super(underlying, initialBufferCapacity, maxLength);
     this.underlying = underlying;
     this.maxLength = maxLength;
-    readBuffer = new AutoScalingBufferReadTransport(initialBufferCapacity, 1.5);
-    writeBuffer = new AutoScalingBufferWriteTransport(initialBufferCapacity, 1.5);
+    readBuffer = new AutoScalingBufferReadTransport(initialBufferCapacity);
+    writeBuffer = new AutoScalingBufferWriteTransport(initialBufferCapacity);
     writeCompressBuffer = new TByteBuffer(ByteBuffer.allocate(initialBufferCapacity));
     readCompressBuffer = new TByteBuffer(ByteBuffer.allocate(initialBufferCapacity));
   }
@@ -109,7 +109,7 @@ public class TElasticFramedTransport extends TFastFramedTransport {
       readCompressBuffer.getByteBuffer().position(0);
 
       if (uncompressedLength < maxLength) {
-        readBuffer.shrinkSizeIfNecessary(maxLength);
+        readBuffer.resizeIfNecessary(maxLength);
       }
       readBuffer.fill(readCompressBuffer, uncompressedLength);
     } catch (IOException e) {
@@ -147,7 +147,7 @@ public class TElasticFramedTransport extends TFastFramedTransport {
     }
 
     writeBuffer.reset();
-    writeBuffer.shrinkSizeIfNecessary(maxLength);
+    writeBuffer.resizeIfNecessary(maxLength);
     underlying.flush();
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
similarity index 91%
rename from cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TElasticFramedTransport.java
rename to service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
index 7ad4a8c..307b363 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.client.rpcutils;
+package org.apache.iotdb.rpc;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -28,7 +28,7 @@ import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.transport.TTransportFactory;
 import org.xerial.snappy.Snappy;
 
-public class TElasticFramedTransport extends TFastFramedTransport {
+public class TSnappyElasticFramedTransport extends TFastFramedTransport {
 
   private TByteBuffer writeCompressBuffer;
   private TByteBuffer readCompressBuffer;
@@ -53,20 +53,20 @@ public class TElasticFramedTransport extends TFastFramedTransport {
 
     @Override
     public TTransport getTransport(TTransport trans) {
-      return new TElasticFramedTransport(trans, initialCapacity, maxLength);
+      return new TSnappyElasticFramedTransport(trans, initialCapacity, maxLength);
     }
   }
 
-  public TElasticFramedTransport(TTransport underlying) {
+  public TSnappyElasticFramedTransport(TTransport underlying) {
     this(underlying, DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH);
   }
 
-  public TElasticFramedTransport(TTransport underlying, int initialBufferCapacity, int maxLength) {
+  public TSnappyElasticFramedTransport(TTransport underlying, int initialBufferCapacity, int maxLength) {
     super(underlying, initialBufferCapacity, maxLength);
     this.underlying = underlying;
     this.maxLength = maxLength;
-    readBuffer = new AutoScalingBufferReadTransport(initialBufferCapacity, 1.5);
-    writeBuffer = new AutoScalingBufferWriteTransport(initialBufferCapacity, 1.5);
+    readBuffer = new AutoScalingBufferReadTransport(initialBufferCapacity);
+    writeBuffer = new AutoScalingBufferWriteTransport(initialBufferCapacity);
     writeCompressBuffer = new TByteBuffer(ByteBuffer.allocate(initialBufferCapacity));
     readCompressBuffer = new TByteBuffer(ByteBuffer.allocate(initialBufferCapacity));
   }
@@ -109,7 +109,7 @@ public class TElasticFramedTransport extends TFastFramedTransport {
       readCompressBuffer.getByteBuffer().position(0);
 
       if (uncompressedLength < maxLength) {
-        readBuffer.shrinkSizeIfNecessary(maxLength);
+        readBuffer.resizeIfNecessary(maxLength);
       }
       readBuffer.fill(readCompressBuffer, uncompressedLength);
     } catch (IOException e) {
@@ -147,7 +147,7 @@ public class TElasticFramedTransport extends TFastFramedTransport {
     }
 
     writeBuffer.reset();
-    writeBuffer.shrinkSizeIfNecessary(maxLength);
+    writeBuffer.resizeIfNecessary(maxLength);
     underlying.flush();
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TimeoutChangeableTFastFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
similarity index 76%
copy from cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TimeoutChangeableTFastFramedTransport.java
copy to service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
index f4d9a95..4aa50a7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TimeoutChangeableTFastFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
@@ -17,13 +17,14 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.client.rpcutils;
+package org.apache.iotdb.rpc;
 
 import java.net.SocketException;
-import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportFactory;
 
-public class TimeoutChangeableTFastFramedTransport extends TElasticFramedTransport {
+public class TimeoutChangeableTFastFramedTransport extends TElasticFramedTransport implements TimeoutChangeableTransport {
 
   private TSocket underlying;
 
@@ -36,8 +37,15 @@ public class TimeoutChangeableTFastFramedTransport extends TElasticFramedTranspo
     underlying.setTimeout(timeout);
   }
 
-  @TestOnly
   public int getTimeOut() throws SocketException {
     return underlying.getSocket().getSoTimeout();
   }
+
+  public static class Factory extends TTransportFactory {
+
+    @Override
+    public TTransport getTransport(TTransport trans) {
+      return new TimeoutChangeableTSnappyFramedTransport((TSocket) trans);
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TimeoutChangeableTFastFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
similarity index 68%
rename from cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TimeoutChangeableTFastFramedTransport.java
rename to service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
index f4d9a95..b251f3f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TimeoutChangeableTFastFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
@@ -17,17 +17,18 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.client.rpcutils;
+package org.apache.iotdb.rpc;
 
 import java.net.SocketException;
-import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportFactory;
 
-public class TimeoutChangeableTFastFramedTransport extends TElasticFramedTransport {
+public class TimeoutChangeableTSnappyFramedTransport extends TSnappyElasticFramedTransport implements TimeoutChangeableTransport {
 
   private TSocket underlying;
 
-  public TimeoutChangeableTFastFramedTransport(TSocket underlying) {
+  public TimeoutChangeableTSnappyFramedTransport(TSocket underlying) {
     super(underlying);
     this.underlying = underlying;
   }
@@ -36,8 +37,14 @@ public class TimeoutChangeableTFastFramedTransport extends TElasticFramedTranspo
     underlying.setTimeout(timeout);
   }
 
-  @TestOnly
   public int getTimeOut() throws SocketException {
     return underlying.getSocket().getSoTimeout();
   }
+
+  public static class Factory extends TTransportFactory {
+    @Override
+    public TTransport getTransport(TTransport trans) {
+      return new TimeoutChangeableTSnappyFramedTransport((TSocket) trans);
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/AutoScalingBuffer.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTransport.java
similarity index 61%
rename from cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/AutoScalingBuffer.java
rename to service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTransport.java
index 3408a89..6d34d8d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/AutoScalingBuffer.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTransport.java
@@ -17,23 +17,15 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.client.rpcutils;
 
-import java.util.Arrays;
-import org.apache.thrift.transport.AutoExpandingBuffer;
+package org.apache.iotdb.rpc;
 
-public class AutoScalingBuffer extends AutoExpandingBuffer {
+import java.net.SocketException;
 
-  private byte[] array;
+public interface TimeoutChangeableTransport {
+  void setTimeout(int timeout);
 
-  public AutoScalingBuffer(int initialCapacity, double growthCoefficient) {
-    super(initialCapacity, growthCoefficient);
-    this.array = new byte[initialCapacity];
-  }
 
-  public void shrinkSizeIfNecessary(int size) {
-    if (array.length > size) {
-      array = Arrays.copyOf(array, size);
-    }
-  }
+  int getTimeOut() throws SocketException;
+
 }
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index d5ed5eb..28b18eb 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -23,6 +23,7 @@ import java.time.ZoneId;
 import java.util.List;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.RedirectException;
+import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
@@ -47,7 +48,6 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.transport.TFastFramedTransport;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
@@ -75,7 +75,7 @@ public class SessionConnection {
   }
 
   private void init(EndPoint endPoint) throws IoTDBConnectionException {
-    transport = new TFastFramedTransport(
+    transport = RpcTransportFactory.INSTANCE.getTransport(
         new TSocket(endPoint.getIp(), endPoint.getPort(), session.connectionTimeoutInMs));
 
     try {