You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2021/01/05 08:56:06 UTC

[iotdb] 01/01: [IOTDB-1103] Fix frame size larger than max length error

This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch jira-1103
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9baba06efc1f5b7838735721909e01dbcdab5ec2
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Tue Jan 5 16:55:25 2021 +0800

    [IOTDB-1103] Fix frame size larger than max length error
---
 .../main/java/org/apache/iotdb/jdbc/Config.java    |  9 ++++
 .../org/apache/iotdb/jdbc/IoTDBConnection.java     | 24 ++++++----
 .../apache/iotdb/jdbc/IoTDBConnectionParams.java   | 10 ++++
 .../iotdb/db/sync/sender/transfer/SyncClient.java  | 43 ++++++++---------
 .../main/java/org/apache/iotdb/session/Config.java | 10 ++++
 .../java/org/apache/iotdb/session/Session.java     | 54 +++++++++++++++-------
 6 files changed, 103 insertions(+), 47 deletions(-)

diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
index 9025607..457f732 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
@@ -54,4 +54,13 @@ public class Config {
 
   public static boolean rpcThriftCompressionEnable = false;
 
+  /**
+   * thrift init buffer size, 1KB by default
+   */
+  public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
+
+  /**
+   * thrift max frame size (16384000 bytes by default), we change it to 64MB
+   */
+  public static final int DEFAULT_MAX_FRAME_SIZE = 67108864;
 }
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
index 85464cd..5a0578d 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
@@ -40,7 +40,14 @@ import java.util.Properties;
 import java.util.concurrent.Executor;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.service.rpc.thrift.*;
+import org.apache.iotdb.service.rpc.thrift.ServerProperties;
+import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
+import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
@@ -78,10 +85,9 @@ public class IoTDBConnection implements Connection {
     params = Utils.parseUrl(url, info);
 
     openTransport();
-    if(Config.rpcThriftCompressionEnable) {
+    if (Config.rpcThriftCompressionEnable) {
       setClient(new TSIService.Client(new TCompactProtocol(transport)));
-    }
-    else {
+    } else {
       setClient(new TSIService.Client(new TBinaryProtocol(transport)));
     }
     // open client session
@@ -120,7 +126,8 @@ public class IoTDBConnection implements Connection {
     try {
       getClient().closeSession(req);
     } catch (TException e) {
-      throw new SQLException("Error occurs when closing session at server. Maybe server is down.", e);
+      throw new SQLException("Error occurs when closing session at server. Maybe server is down.",
+          e);
     } finally {
       isClosed = true;
       if (transport != null) {
@@ -408,7 +415,7 @@ public class IoTDBConnection implements Connection {
 
   private void openTransport() throws TTransportException {
     transport = new TFastFramedTransport(new TSocket(params.getHost(), params.getPort(),
-        Config.connectionTimeoutInMs));
+        Config.connectionTimeoutInMs), params.getInitialBufferCapacity(), params.getMaxFrameSize());
     if (!transport.isOpen()) {
       transport.open();
     }
@@ -463,10 +470,9 @@ public class IoTDBConnection implements Connection {
         if (transport != null) {
           transport.close();
           openTransport();
-          if(Config.rpcThriftCompressionEnable) {
+          if (Config.rpcThriftCompressionEnable) {
             setClient(new TSIService.Client(new TCompactProtocol(transport)));
-          }
-          else {
+          } else {
             setClient(new TSIService.Client(new TBinaryProtocol(transport)));
           }
           openSession();
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java
index dfd1772..1bbb059 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java
@@ -27,6 +27,9 @@ public class IoTDBConnectionParams {
   private String username = Config.DEFAULT_USER;
   private String password = Config.DEFALUT_PASSWORD;
 
+  private int initialBufferCapacity = Config.DEFAULT_INITIAL_BUFFER_CAPACITY;
+  private int maxFrameSize = Config.DEFAULT_MAX_FRAME_SIZE;
+
   public IoTDBConnectionParams(String url) {
     this.jdbcUriString = url;
   }
@@ -79,4 +82,11 @@ public class IoTDBConnectionParams {
     this.password = password;
   }
 
+  public int getInitialBufferCapacity() {
+    return initialBufferCapacity;
+  }
+
+  public int getMaxFrameSize() {
+    return maxFrameSize;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
index fdee50c..cf8a744 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
@@ -53,6 +53,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -89,6 +90,8 @@ public class SyncClient implements ISyncClient {
 
   private static SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig();
 
+  private static final IoTDBConfig ioTDBConfig = IoTDBDescriptor.getInstance().getConfig();
+
   private static final int BATCH_LINE = 1000;
 
   private static final int TIMEOUT_MS = 1000;
@@ -233,9 +236,9 @@ public class SyncClient implements ISyncClient {
     syncSchema();
 
     // 3. Sync all data
-    String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+    String[] dataDirs = ioTDBConfig.getDataDirs();
     logger.info("There are {} data dirs to be synced.", dataDirs.length);
-    for (int i = 0 ; i < dataDirs.length; i++) {
+    for (int i = 0; i < dataDirs.length; i++) {
       String dataDir = dataDirs[i];
       logger.info("Start to sync data in data dir {}, the process is {}/{}", dataDir, i + 1,
           dataDirs.length);
@@ -274,9 +277,10 @@ public class SyncClient implements ISyncClient {
 
   @Override
   public void establishConnection(String serverIp, int serverPort) throws SyncConnectionException {
-    transport = new TFastFramedTransport(new TSocket(serverIp, serverPort, TIMEOUT_MS));
-    TProtocol protocol = null;
-    if (IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) {
+    transport = new TFastFramedTransport(new TSocket(serverIp, serverPort, TIMEOUT_MS),
+        ioTDBConfig.getThriftInitBufferSize(), ioTDBConfig.getThriftMaxFrameSize());
+    TProtocol protocol;
+    if (ioTDBConfig.isRpcThriftCompressionEnable()) {
       protocol = new TCompactProtocol(transport);
     } else {
       protocol = new TBinaryProtocol(transport);
@@ -296,10 +300,9 @@ public class SyncClient implements ISyncClient {
   public void confirmIdentity() throws SyncConnectionException {
     try (Socket socket = new Socket(config.getServerIp(), config.getServerPort())) {
       ConfirmInfo info = new ConfirmInfo(socket.getLocalAddress().getHostAddress(),
-          getOrCreateUUID(getUuidFile()),
-          IoTDBDescriptor.getInstance().getConfig().getPartitionInterval(), IoTDBConstant.VERSION);
-      SyncStatus status = serviceClient
-          .check(info);
+          getOrCreateUUID(getUuidFile()), ioTDBConfig.getPartitionInterval(),
+          IoTDBConstant.VERSION);
+      SyncStatus status = serviceClient.check(info);
       if (status.code != SUCCESS_CODE) {
         throw new SyncConnectionException(
             "The receiver rejected the synchronization task because " + status.msg);
@@ -432,14 +435,14 @@ public class SyncClient implements ISyncClient {
       if (syncSchemaLogFile.exists()) {
         try (BufferedReader br = new BufferedReader(new FileReader(syncSchemaLogFile))) {
           String pos = br.readLine();
-          if(pos != null) {
+          if (pos != null) {
             return Integer.parseInt(pos);
           }
         }
       }
     } catch (IOException e) {
       logger.error("Can not find file {}", syncSchemaLogFile.getAbsoluteFile(), e);
-    } catch (NumberFormatException e){
+    } catch (NumberFormatException e) {
       logger.error("Sync schema pos is not valid", e);
     }
     return 0;
@@ -514,8 +517,8 @@ public class SyncClient implements ISyncClient {
   }
 
   @Override
-  public void syncDeletedFilesNameInOneGroup(String sgName, Long timeRangeId, Set<File> deletedFilesName)
-      throws IOException {
+  public void syncDeletedFilesNameInOneGroup(String sgName, Long timeRangeId,
+      Set<File> deletedFilesName) throws IOException {
     if (deletedFilesName.isEmpty()) {
       logger.info("There has no deleted files to be synced in storage group {}", sgName);
       return;
@@ -615,7 +618,7 @@ public class SyncClient implements ISyncClient {
             ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
             bos.reset();
             SyncStatus status = serviceClient.syncData(buffToSend);
-            if(status.code == CONFLICT_CODE){
+            if (status.code == CONFLICT_CODE) {
               throw new SyncDeviceOwnerConflictException(status.msg);
             }
             if (status.code != SUCCESS_CODE) {
@@ -649,7 +652,7 @@ public class SyncClient implements ISyncClient {
     try (BufferedWriter bw = new BufferedWriter(new FileWriter(currentLocalFile))) {
       for (Map<Long, Set<File>> currentLocalFiles : lastLocalFilesMap.values()) {
         for (Set<File> files : currentLocalFiles.values()) {
-          for(File file: files) {
+          for (File file : files) {
             bw.write(file.getAbsolutePath());
             bw.newLine();
           }
@@ -677,23 +680,21 @@ public class SyncClient implements ISyncClient {
 
 
   private File getSchemaPosFile() {
-    return new File(IoTDBDescriptor.getInstance().getConfig().getSyncDir(),
+    return new File(ioTDBConfig.getSyncDir(),
         config.getSyncReceiverName() + File.separator + SyncConstant.SCHEMA_POS_FILE_NAME);
   }
 
   private File getSchemaLogFile() {
-    return new File(IoTDBDescriptor.getInstance().getConfig().getSchemaDir(),
-        MetadataConstant.METADATA_LOG);
+    return new File(ioTDBConfig.getSchemaDir(), MetadataConstant.METADATA_LOG);
   }
 
   private File getLockFile() {
-    return new File(IoTDBDescriptor.getInstance().getConfig().getSyncDir(),
+    return new File(ioTDBConfig.getSyncDir(),
         config.getSyncReceiverName() + File.separator + SyncConstant.LOCK_FILE_NAME);
   }
 
   private File getUuidFile() {
-    return new File(IoTDBDescriptor.getInstance().getConfig().getSyncDir(),
-        SyncConstant.UUID_FILE_NAME);
+    return new File(ioTDBConfig.getSyncDir(), SyncConstant.UUID_FILE_NAME);
   }
 
   private static class InstanceHolder {
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java b/session/src/main/java/org/apache/iotdb/session/Config.java
index b7cfb24..3d31eab 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/session/src/main/java/org/apache/iotdb/session/Config.java
@@ -26,4 +26,14 @@ public class Config {
   public static final int DEFAULT_TIMEOUT_MS = 0;
   public static final int RETRY_NUM = 3;
   public static final long RETRY_INTERVAL_MS = 1000;
+
+  /**
+   * thrift init buffer size, 1KB by default
+   */
+  public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
+
+  /**
+   * thrift max frame size (16384000 bytes by default), we change it to 64MB
+   */
+  public static final int DEFAULT_MAX_FRAME_SIZE = 67108864;
 }
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index ffb65ad..beb9056 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -76,6 +76,8 @@ public class Session {
   private int rpcPort;
   private String username;
   private String password;
+  private int initialBufferCapacity;
+  private int maxFrameSize;
   private TSIService.Iface client = null;
   private long sessionId;
   private TTransport transport;
@@ -87,32 +89,46 @@ public class Session {
   private int connectionTimeoutInMs;
 
   public Session(String host, int rpcPort) {
-    this(host, rpcPort, Config.DEFAULT_USER, Config.DEFAULT_PASSWORD, Config.DEFAULT_FETCH_SIZE, null);
+    this(host, rpcPort, Config.DEFAULT_USER, Config.DEFAULT_PASSWORD, Config.DEFAULT_FETCH_SIZE,
+        null, Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
   }
 
   public Session(String host, String rpcPort, String username, String password) {
-    this(host, Integer.parseInt(rpcPort), username, password, Config.DEFAULT_FETCH_SIZE, null);
+    this(host, Integer.parseInt(rpcPort), username, password, Config.DEFAULT_FETCH_SIZE, null,
+        Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
   }
 
   public Session(String host, int rpcPort, String username, String password) {
-    this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, null);
+    this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, null,
+        Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
   }
 
   public Session(String host, int rpcPort, String username, String password, int fetchSize) {
-    this(host, rpcPort, username, password, fetchSize, null);
+    this(host, rpcPort, username, password, fetchSize, null,
+        Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
   }
 
   public Session(String host, int rpcPort, String username, String password, ZoneId zoneId) {
-    this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, zoneId);
+    this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, zoneId,
+        Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
   }
 
-  public Session(String host, int rpcPort, String username, String password, int fetchSize, ZoneId zoneId) {
+  public Session(String host, int rpcPort, String username, String password, int fetchSize,
+      ZoneId zoneId) {
+    this(host, rpcPort, username, password, fetchSize, zoneId,
+        Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
+  }
+
+  public Session(String host, int rpcPort, String username, String password, int fetchSize,
+      ZoneId zoneId, int initialBufferCapacity, int maxFrameSize) {
     this.host = host;
     this.rpcPort = rpcPort;
     this.username = username;
     this.password = password;
     this.fetchSize = fetchSize;
     this.zoneId = zoneId;
+    this.initialBufferCapacity = initialBufferCapacity;
+    this.maxFrameSize = maxFrameSize;
   }
 
   public synchronized void open() throws IoTDBConnectionException {
@@ -132,7 +148,8 @@ public class Session {
     this.enableRPCCompression = enableRPCCompression;
     this.connectionTimeoutInMs = connectionTimeoutInMs;
 
-    transport = new TFastFramedTransport(new TSocket(host, rpcPort, connectionTimeoutInMs));
+    transport = new TFastFramedTransport(new TSocket(host, rpcPort, connectionTimeoutInMs),
+        initialBufferCapacity, maxFrameSize);
 
     if (!transport.isOpen()) {
       try {
@@ -393,8 +410,9 @@ public class Session {
       List<List<String>> measurementsList, List<List<TSDataType>> typesList,
       List<List<Object>> valuesList)
       throws IoTDBConnectionException, StatementExecutionException {
-     insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList, false);
+    insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList, false);
   }
+
   /**
    * Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
    * executeBatch, we pack some insert request in batch and send them to server. If you want improve
@@ -402,15 +420,15 @@ public class Session {
    * <p>
    * Each row is independent, which could have different deviceId, time, number of measurements
    *
-   * @param haveSorted  whether the times have been sorted
+   * @param haveSorted whether the times have been sorted
    * @see Session#insertTablet(Tablet)
    */
   public void insertRecordsOfOneDevice(String deviceId, List<Long> times,
       List<List<String>> measurementsList, List<List<TSDataType>> typesList,
       List<List<Object>> valuesList, boolean haveSorted)
       throws IoTDBConnectionException, StatementExecutionException {
-    TSInsertRecordsOfOneDeviceReq request = genTSInsertRecordsOfOneDeviceReq(deviceId, times, measurementsList,
-        typesList, valuesList, haveSorted);
+    TSInsertRecordsOfOneDeviceReq request = genTSInsertRecordsOfOneDeviceReq(deviceId, times,
+        measurementsList, typesList, valuesList, haveSorted);
     try {
       RpcUtils.verifySuccess(client.insertRecordsOfOneDevice(request));
     } catch (TException e) {
@@ -427,9 +445,10 @@ public class Session {
     }
   }
 
-  private TSInsertRecordsOfOneDeviceReq genTSInsertRecordsOfOneDeviceReq(String deviceId, List<Long> times,
-      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
-      List<List<Object>> valuesList, boolean haveSorted) throws IoTDBConnectionException, BatchExecutionException {
+  private TSInsertRecordsOfOneDeviceReq genTSInsertRecordsOfOneDeviceReq(String deviceId,
+      List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList, boolean haveSorted)
+      throws IoTDBConnectionException, BatchExecutionException {
     // check params size
     int len = times.size();
     if (len != measurementsList.size() || len != valuesList.size()) {
@@ -439,7 +458,8 @@ public class Session {
 
     if (haveSorted) {
       if (!checkSorted(times)) {
-        throw new BatchExecutionException("Times in InsertOneDeviceRecords are not in ascending order");
+        throw new BatchExecutionException(
+            "Times in InsertOneDeviceRecords are not in ascending order");
       }
     } else {
       //sort
@@ -478,8 +498,8 @@ public class Session {
     return Arrays.asList(result);
   }
 
-  private List<ByteBuffer> objectValuesListToByteBufferList(List<List<Object>> valuesList, List<List<TSDataType>> typesList)
-      throws IoTDBConnectionException {
+  private List<ByteBuffer> objectValuesListToByteBufferList(List<List<Object>> valuesList,
+      List<List<TSDataType>> typesList) throws IoTDBConnectionException {
     List<ByteBuffer> buffersList = new ArrayList<>();
     for (int i = 0; i < valuesList.size(); i++) {
       ByteBuffer buffer = ByteBuffer.allocate(calculateLength(typesList.get(i), valuesList.get(i)));