You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2021/01/06 03:17:05 UTC

[iotdb] branch jira-1103-master created (now 54593bf)

This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a change to branch jira-1103-master
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 54593bf  [IOTDB-1103] Fix frame size larger than max length error

This branch includes the following new commits:

     new 54593bf  [IOTDB-1103] Fix frame size larger than max length error

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [IOTDB-1103] Fix frame size larger than max length error

Posted by su...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch jira-1103-master
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 54593bf2b80edd0f3f9694a067962caafe115a8a
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Wed Jan 6 11:16:13 2021 +0800

    [IOTDB-1103] Fix frame size larger than max length error
---
 .../main/java/org/apache/iotdb/jdbc/Config.java    | 16 ++++++-
 .../org/apache/iotdb/jdbc/IoTDBConnection.java     | 15 ++++---
 .../apache/iotdb/jdbc/IoTDBConnectionParams.java   | 18 ++++++++
 .../src/main/java/org/apache/iotdb/jdbc/Utils.java |  7 +++
 .../iotdb/db/sync/sender/transfer/SyncClient.java  | 41 +++++++++--------
 .../org/apache/iotdb/rpc/RpcTransportFactory.java  | 14 +++++-
 .../rpc/TimeoutChangeableTFastFramedTransport.java | 11 ++++-
 .../main/java/org/apache/iotdb/session/Config.java | 10 +++++
 .../java/org/apache/iotdb/session/Session.java     | 51 ++++++++++++++--------
 .../apache/iotdb/session/SessionConnection.java    |  2 +
 10 files changed, 138 insertions(+), 47 deletions(-)

diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
index 7fc20c2..210b981 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
@@ -20,7 +20,8 @@ package org.apache.iotdb.jdbc;
 
 public class Config {
 
-  private Config(){}
+  private Config() {
+  }
 
   /**
    * The required prefix for the connection URL.
@@ -54,4 +55,17 @@ public class Config {
 
   public static boolean rpcThriftCompressionEnable = false;
 
+  /**
+   * thrift init buffer size, 1KB by default
+   */
+  public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
+
+  public static final String INITIAL_BUFFER_CAPACITY = "initial_buffer_capacity";
+
+  /**
+   * thrift max frame size (16384000 bytes by default), we change it to 64MB
+   */
+  public static final int DEFAULT_MAX_FRAME_SIZE = 67108864;
+
+  public static final String MAX_FRAME_SIZE = "max_frame_size";
 }
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
index 1f954e3..c96c960 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
@@ -85,10 +85,9 @@ public class IoTDBConnection implements Connection {
     params = Utils.parseUrl(url, info);
 
     openTransport();
-    if(Config.rpcThriftCompressionEnable) {
+    if (Config.rpcThriftCompressionEnable) {
       setClient(new TSIService.Client(new TCompactProtocol(transport)));
-    }
-    else {
+    } else {
       setClient(new TSIService.Client(new TBinaryProtocol(transport)));
     }
     // open client session
@@ -127,7 +126,8 @@ public class IoTDBConnection implements Connection {
     try {
       getClient().closeSession(req);
     } catch (TException e) {
-      throw new SQLException("Error occurs when closing session at server. Maybe server is down.", e);
+      throw new SQLException("Error occurs when closing session at server. Maybe server is down.",
+          e);
     } finally {
       isClosed = true;
       if (transport != null) {
@@ -414,6 +414,8 @@ public class IoTDBConnection implements Connection {
   }
 
   private void openTransport() throws TTransportException {
+    RpcTransportFactory.INSTANCE.setInitialBufferCapacity(params.getInitialBufferCapacity());
+    RpcTransportFactory.INSTANCE.setMaxLength(params.getMaxFrameSize());
     transport = RpcTransportFactory.INSTANCE
         .getTransport(new TSocket(params.getHost(), params.getPort(),
             Config.connectionTimeoutInMs));
@@ -471,10 +473,9 @@ public class IoTDBConnection implements Connection {
         if (transport != null) {
           transport.close();
           openTransport();
-          if(Config.rpcThriftCompressionEnable) {
+          if (Config.rpcThriftCompressionEnable) {
             setClient(new TSIService.Client(new TCompactProtocol(transport)));
-          }
-          else {
+          } else {
             setClient(new TSIService.Client(new TBinaryProtocol(transport)));
           }
           openSession();
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java
index 7590828..d0c585f 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java
@@ -27,6 +27,9 @@ public class IoTDBConnectionParams {
   private String username = Config.DEFAULT_USER;
   private String password = Config.DEFAULT_PASSWORD;
 
+  private int initialBufferCapacity = Config.DEFAULT_INITIAL_BUFFER_CAPACITY;
+  private int maxFrameSize = Config.DEFAULT_MAX_FRAME_SIZE;
+
   public IoTDBConnectionParams(String url) {
     this.jdbcUriString = url;
   }
@@ -79,4 +82,19 @@ public class IoTDBConnectionParams {
     this.password = password;
   }
 
+  public int getInitialBufferCapacity() {
+    return initialBufferCapacity;
+  }
+
+  public void setInitialBufferCapacity(int initialBufferCapacity) {
+    this.initialBufferCapacity = initialBufferCapacity;
+  }
+
+  public int getMaxFrameSize() {
+    return maxFrameSize;
+  }
+
+  public void setMaxFrameSize(int maxFrameSize) {
+    this.maxFrameSize = maxFrameSize;
+  }
 }
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java
index 156f645..578907c 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java
@@ -60,6 +60,13 @@ public class Utils {
     if (info.containsKey(Config.AUTH_PASSWORD)) {
       params.setPassword(info.getProperty(Config.AUTH_PASSWORD));
     }
+    if (info.containsKey(Config.INITIAL_BUFFER_CAPACITY)) {
+      params.setInitialBufferCapacity(Integer.parseInt(
+          info.getProperty(Config.INITIAL_BUFFER_CAPACITY)));
+    }
+    if (info.containsKey(Config.MAX_FRAME_SIZE)) {
+      params.setMaxFrameSize(Integer.parseInt(info.getProperty(Config.MAX_FRAME_SIZE)));
+    }
 
     return params;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
index 2d94da4..728b2ed 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
@@ -53,6 +53,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -88,6 +89,8 @@ public class SyncClient implements ISyncClient {
 
   private static SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig();
 
+  private static final IoTDBConfig ioTDBConfig = IoTDBDescriptor.getInstance().getConfig();
+
   private static final int TIMEOUT_MS = 1000;
 
   /**
@@ -230,9 +233,9 @@ public class SyncClient implements ISyncClient {
     syncSchema();
 
     // 3. Sync all data
-    String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+    String[] dataDirs = ioTDBConfig.getDataDirs();
     logger.info("There are {} data dirs to be synced.", dataDirs.length);
-    for (int i = 0 ; i < dataDirs.length; i++) {
+    for (int i = 0; i < dataDirs.length; i++) {
       String dataDir = dataDirs[i];
       logger.info("Start to sync data in data dir {}, the process is {}/{}", dataDir, i + 1,
           dataDirs.length);
@@ -271,10 +274,12 @@ public class SyncClient implements ISyncClient {
 
   @Override
   public void establishConnection(String serverIp, int serverPort) throws SyncConnectionException {
+    RpcTransportFactory.INSTANCE.setInitialBufferCapacity(ioTDBConfig.getThriftInitBufferSize());
+    RpcTransportFactory.INSTANCE.setMaxLength(ioTDBConfig.getThriftMaxFrameSize());
     transport = RpcTransportFactory.INSTANCE
         .getTransport(new TSocket(serverIp, serverPort, TIMEOUT_MS));
-    TProtocol protocol = null;
-    if (IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) {
+    TProtocol protocol;
+    if (ioTDBConfig.isRpcThriftCompressionEnable()) {
       protocol = new TCompactProtocol(transport);
     } else {
       protocol = new TBinaryProtocol(transport);
@@ -295,8 +300,8 @@ public class SyncClient implements ISyncClient {
   public void confirmIdentity() throws SyncConnectionException {
     try (Socket socket = new Socket(config.getServerIp(), config.getServerPort())) {
       ConfirmInfo info = new ConfirmInfo(socket.getLocalAddress().getHostAddress(),
-          getOrCreateUUID(getUuidFile()),
-          IoTDBDescriptor.getInstance().getConfig().getPartitionInterval(), IoTDBConstant.VERSION);
+          getOrCreateUUID(getUuidFile()), ioTDBConfig.getPartitionInterval(),
+          IoTDBConstant.VERSION);
       SyncStatus status = serviceClient
           .check(info);
       if (status.code != SUCCESS_CODE) {
@@ -406,7 +411,8 @@ public class SyncClient implements ISyncClient {
       return true;
     } else {
       logger
-          .error("Digest check of schema file {} failed, retry", getSchemaLogFile().getAbsoluteFile());
+          .error("Digest check of schema file {} failed, retry",
+              getSchemaLogFile().getAbsoluteFile());
       return false;
     }
   }
@@ -416,14 +422,14 @@ public class SyncClient implements ISyncClient {
       if (syncSchemaLogFile.exists()) {
         try (BufferedReader br = new BufferedReader(new FileReader(syncSchemaLogFile))) {
           String pos = br.readLine();
-          if(pos != null) {
+          if (pos != null) {
             return Integer.parseInt(pos);
           }
         }
       }
     } catch (IOException e) {
       logger.error("Can not find file {}", syncSchemaLogFile.getAbsoluteFile(), e);
-    } catch (NumberFormatException e){
+    } catch (NumberFormatException e) {
       logger.error("Sync schema pos is not valid", e);
     }
     return 0;
@@ -498,7 +504,8 @@ public class SyncClient implements ISyncClient {
   }
 
   @Override
-  public void syncDeletedFilesNameInOneGroup(String sgName, Long timeRangeId, Set<File> deletedFilesName)
+  public void syncDeletedFilesNameInOneGroup(String sgName, Long timeRangeId,
+      Set<File> deletedFilesName)
       throws IOException {
     if (deletedFilesName.isEmpty()) {
       logger.info("There has no deleted files to be synced in storage group {}", sgName);
@@ -599,7 +606,7 @@ public class SyncClient implements ISyncClient {
             ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
             bos.reset();
             SyncStatus status = serviceClient.syncData(buffToSend);
-            if(status.code == CONFLICT_CODE){
+            if (status.code == CONFLICT_CODE) {
               throw new SyncDeviceOwnerConflictException(status.msg);
             }
             if (status.code != SUCCESS_CODE) {
@@ -633,7 +640,7 @@ public class SyncClient implements ISyncClient {
     try (BufferedWriter bw = new BufferedWriter(new FileWriter(currentLocalFile))) {
       for (Map<Long, Set<File>> currentLocalFiles : lastLocalFilesMap.values()) {
         for (Set<File> files : currentLocalFiles.values()) {
-          for(File file: files) {
+          for (File file : files) {
             bw.write(file.getAbsolutePath());
             bw.newLine();
           }
@@ -661,23 +668,21 @@ public class SyncClient implements ISyncClient {
 
 
   private File getSchemaPosFile() {
-    return new File(IoTDBDescriptor.getInstance().getConfig().getSyncDir(),
+    return new File(ioTDBConfig.getSyncDir(),
         config.getSyncReceiverName() + File.separator + SyncConstant.SCHEMA_POS_FILE_NAME);
   }
 
   private File getSchemaLogFile() {
-    return new File(IoTDBDescriptor.getInstance().getConfig().getSchemaDir(),
-        MetadataConstant.METADATA_LOG);
+    return new File(ioTDBConfig.getSchemaDir(), MetadataConstant.METADATA_LOG);
   }
 
   private File getLockFile() {
-    return new File(IoTDBDescriptor.getInstance().getConfig().getSyncDir(),
+    return new File(ioTDBConfig.getSyncDir(),
         config.getSyncReceiverName() + File.separator + SyncConstant.LOCK_FILE_NAME);
   }
 
   private File getUuidFile() {
-    return new File(IoTDBDescriptor.getInstance().getConfig().getSyncDir(),
-        SyncConstant.UUID_FILE_NAME);
+    return new File(ioTDBConfig.getSyncDir(), SyncConstant.UUID_FILE_NAME);
   }
 
   private static class InstanceHolder {
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
index 57907de..7ce4e9d 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
@@ -29,10 +29,14 @@ public class RpcTransportFactory extends TTransportFactory {
   // TODO: make it a config
   public static boolean USE_SNAPPY = false;
   public static final RpcTransportFactory INSTANCE;
+
+  private static int initialBufferCapacity = RpcUtils.DEFAULT_BUF_CAPACITY;
+  private static int maxLength = RpcUtils.DEFAULT_MAX_LENGTH;
+
   static {
     INSTANCE = USE_SNAPPY ?
         new RpcTransportFactory(new TimeoutChangeableTSnappyFramedTransport.Factory()) :
-        new RpcTransportFactory(new Factory());
+        new RpcTransportFactory(new Factory(initialBufferCapacity, maxLength));
   }
 
   private TTransportFactory inner;
@@ -53,4 +57,12 @@ public class RpcTransportFactory extends TTransportFactory {
   public static void setUseSnappy(boolean useSnappy) {
     USE_SNAPPY = useSnappy;
   }
+
+  public void setInitialBufferCapacity(int initialBufferCapacity) {
+    RpcTransportFactory.initialBufferCapacity = initialBufferCapacity;
+  }
+
+  public void setMaxLength(int maxLength) {
+    RpcTransportFactory.maxLength = maxLength;
+  }
 }
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
index be3fca9..b34e979 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
@@ -43,12 +43,21 @@ public class TimeoutChangeableTFastFramedTransport extends TElasticFramedTranspo
 
   public static class Factory extends TTransportFactory {
 
+    private final int initialBufferCapacity;
+
+    private final int maxLength;
+
+    public Factory(int initialBufferCapacity, int maxLength) {
+      this.initialBufferCapacity = initialBufferCapacity;
+      this.maxLength = maxLength;
+    }
+
     @Override
     public TTransport getTransport(TTransport trans) {
       if (trans instanceof TSocket) {
         return new TimeoutChangeableTFastFramedTransport((TSocket) trans);
       } else {
-        return new TElasticFramedTransport(trans);
+        return new TElasticFramedTransport(trans, initialBufferCapacity, maxLength);
       }
     }
   }
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java b/session/src/main/java/org/apache/iotdb/session/Config.java
index cc0717e..d900df7 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/session/src/main/java/org/apache/iotdb/session/Config.java
@@ -28,4 +28,14 @@ public class Config {
 
   public static final int RETRY_NUM = 3;
   public static final long RETRY_INTERVAL_MS = 1000;
+
+  /**
+   * thrift init buffer size, 1KB by default
+   */
+  public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
+
+  /**
+   * thrift max frame size (16384000 bytes by default), we change it to 64MB
+   */
+  public static final int DEFAULT_MAX_FRAME_SIZE = 67108864;
 }
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 6fd35d2..c6c8fe7 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.iotdb.rpc.BatchExecutionException;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.RedirectException;
-import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
@@ -55,7 +54,6 @@ import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,6 +69,9 @@ public class Session {
   protected boolean enableRPCCompression;
   protected int connectionTimeoutInMs;
 
+  protected int initialBufferCapacity;
+  protected int maxFrameSize;
+
   private EndPoint defaultEndPoint;
   private SessionConnection defaultSessionConnection;
   protected boolean isClosed = true;
@@ -84,39 +85,48 @@ public class Session {
 
   public Session(String host, int rpcPort) {
     this(host, rpcPort, Config.DEFAULT_USER, Config.DEFAULT_PASSWORD, Config.DEFAULT_FETCH_SIZE,
-        null);
+        null, Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
   }
 
   public Session(String host, String rpcPort, String username, String password) {
-    this(host, Integer.parseInt(rpcPort), username, password, Config.DEFAULT_FETCH_SIZE, null);
+    this(host, Integer.parseInt(rpcPort), username, password, Config.DEFAULT_FETCH_SIZE, null,
+        Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
   }
 
   public Session(String host, int rpcPort, String username, String password) {
-    this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, null);
+    this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, null,
+        Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
   }
 
   public Session(String host, int rpcPort, String username, String password, int fetchSize) {
-    this(host, rpcPort, username, password, fetchSize, null);
+    this(host, rpcPort, username, password, fetchSize, null,
+        Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
   }
 
   public Session(String host, int rpcPort, String username, String password, ZoneId zoneId) {
-    this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, zoneId);
+    this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, zoneId,
+        Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
   }
 
+  @SuppressWarnings("squid:S107")
   public Session(String host, int rpcPort, String username, String password, int fetchSize,
-      ZoneId zoneId) {
+      ZoneId zoneId, int initialBufferCapacity, int maxFrameSize) {
     this.defaultEndPoint = new EndPoint(host, rpcPort);
     this.username = username;
     this.password = password;
     this.fetchSize = fetchSize;
     this.zoneId = zoneId;
+    this.initialBufferCapacity = initialBufferCapacity;
+    this.maxFrameSize = maxFrameSize;
   }
 
-  public void setFetchSize(int fetchSize){
+  public void setFetchSize(int fetchSize) {
     this.fetchSize = fetchSize;
   }
 
-  public int getFetchSize(){ return this.fetchSize; }
+  public int getFetchSize() {
+    return this.fetchSize;
+  }
 
   public synchronized void open() throws IoTDBConnectionException {
     open(false, Config.DEFAULT_TIMEOUT_MS);
@@ -576,6 +586,7 @@ public class Session {
       throws IoTDBConnectionException, StatementExecutionException {
     insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList, false);
   }
+
   /**
    * Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
    * executeBatch, we pack some insert request in batch and send them to server. If you want improve
@@ -583,7 +594,7 @@ public class Session {
    * <p>
    * Each row is independent, which could have different deviceId, time, number of measurements
    *
-   * @param haveSorted  whether the times have been sorted
+   * @param haveSorted whether the times have been sorted
    * @see Session#insertTablet(Tablet)
    */
   public void insertRecordsOfOneDevice(String deviceId, List<Long> times,
@@ -595,8 +606,8 @@ public class Session {
       throw new IllegalArgumentException(
           "deviceIds, times, measurementsList and valuesList's size should be equal");
     }
-    TSInsertRecordsOfOneDeviceReq request = genTSInsertRecordsOfOneDeviceReq(deviceId, times, measurementsList,
-        typesList, valuesList, haveSorted);
+    TSInsertRecordsOfOneDeviceReq request = genTSInsertRecordsOfOneDeviceReq(deviceId, times,
+        measurementsList, typesList, valuesList, haveSorted);
     try {
       getSessionConnection(deviceId).insertRecordsOfOneDevice(request);
     } catch (RedirectException e) {
@@ -604,9 +615,10 @@ public class Session {
     }
   }
 
-  private TSInsertRecordsOfOneDeviceReq genTSInsertRecordsOfOneDeviceReq(String deviceId, List<Long> times,
-      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
-      List<List<Object>> valuesList, boolean haveSorted) throws IoTDBConnectionException, BatchExecutionException {
+  private TSInsertRecordsOfOneDeviceReq genTSInsertRecordsOfOneDeviceReq(String deviceId,
+      List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList, boolean haveSorted)
+      throws IoTDBConnectionException, BatchExecutionException {
     // check params size
     int len = times.size();
     if (len != measurementsList.size() || len != valuesList.size()) {
@@ -616,7 +628,8 @@ public class Session {
 
     if (haveSorted) {
       if (!checkSorted(times)) {
-        throw new BatchExecutionException("Times in InsertOneDeviceRecords are not in ascending order");
+        throw new BatchExecutionException(
+            "Times in InsertOneDeviceRecords are not in ascending order");
       }
     } else {
       //sort
@@ -654,8 +667,8 @@ public class Session {
     return Arrays.asList(result);
   }
 
-  private List<ByteBuffer> objectValuesListToByteBufferList(List<List<Object>> valuesList, List<List<TSDataType>> typesList)
-      throws IoTDBConnectionException {
+  private List<ByteBuffer> objectValuesListToByteBufferList(List<List<Object>> valuesList,
+      List<List<TSDataType>> typesList) throws IoTDBConnectionException {
     List<ByteBuffer> buffersList = new ArrayList<>();
     for (int i = 0; i < valuesList.size(); i++) {
       ByteBuffer buffer = ByteBuffer.allocate(calculateLength(typesList.get(i), valuesList.get(i)));
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index ce85103..71c6528 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -76,6 +76,8 @@ public class SessionConnection {
   }
 
   private void init(EndPoint endPoint) throws IoTDBConnectionException {
+    RpcTransportFactory.INSTANCE.setInitialBufferCapacity(session.initialBufferCapacity);
+    RpcTransportFactory.INSTANCE.setMaxLength(session.maxFrameSize);
     transport = RpcTransportFactory.INSTANCE.getTransport(
         new TSocket(endPoint.getIp(), endPoint.getPort(), session.connectionTimeoutInMs));