You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2021/01/05 08:56:05 UTC

[iotdb] branch jira-1103 created (now 9baba06)

This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a change to branch jira-1103
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 9baba06  [IOTDB-1103] Fix frame size larger than max length error

This branch includes the following new commits:

     new 9baba06  [IOTDB-1103] Fix frame size larger than max length error

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [IOTDB-1103] Fix frame size larger than max length error

Posted by su...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch jira-1103
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9baba06efc1f5b7838735721909e01dbcdab5ec2
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Tue Jan 5 16:55:25 2021 +0800

    [IOTDB-1103] Fix frame size larger than max length error
---
 .../main/java/org/apache/iotdb/jdbc/Config.java    |  9 ++++
 .../org/apache/iotdb/jdbc/IoTDBConnection.java     | 24 ++++++----
 .../apache/iotdb/jdbc/IoTDBConnectionParams.java   | 10 ++++
 .../iotdb/db/sync/sender/transfer/SyncClient.java  | 43 ++++++++---------
 .../main/java/org/apache/iotdb/session/Config.java | 10 ++++
 .../java/org/apache/iotdb/session/Session.java     | 54 +++++++++++++++-------
 6 files changed, 103 insertions(+), 47 deletions(-)

diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
index 9025607..457f732 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
@@ -54,4 +54,13 @@ public class Config {
 
   public static boolean rpcThriftCompressionEnable = false;
 
+  /**
+   * thrift init buffer size, 1KB by default
+   */
+  public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
+
+  /**
+   * thrift max frame size (16384000 bytes by default), we change it to 64MB
+   */
+  public static final int DEFAULT_MAX_FRAME_SIZE = 67108864;
 }
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
index 85464cd..5a0578d 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
@@ -40,7 +40,14 @@ import java.util.Properties;
 import java.util.concurrent.Executor;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.service.rpc.thrift.*;
+import org.apache.iotdb.service.rpc.thrift.ServerProperties;
+import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
+import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
@@ -78,10 +85,9 @@ public class IoTDBConnection implements Connection {
     params = Utils.parseUrl(url, info);
 
     openTransport();
-    if(Config.rpcThriftCompressionEnable) {
+    if (Config.rpcThriftCompressionEnable) {
       setClient(new TSIService.Client(new TCompactProtocol(transport)));
-    }
-    else {
+    } else {
       setClient(new TSIService.Client(new TBinaryProtocol(transport)));
     }
     // open client session
@@ -120,7 +126,8 @@ public class IoTDBConnection implements Connection {
     try {
       getClient().closeSession(req);
     } catch (TException e) {
-      throw new SQLException("Error occurs when closing session at server. Maybe server is down.", e);
+      throw new SQLException("Error occurs when closing session at server. Maybe server is down.",
+          e);
     } finally {
       isClosed = true;
       if (transport != null) {
@@ -408,7 +415,7 @@ public class IoTDBConnection implements Connection {
 
   private void openTransport() throws TTransportException {
     transport = new TFastFramedTransport(new TSocket(params.getHost(), params.getPort(),
-        Config.connectionTimeoutInMs));
+        Config.connectionTimeoutInMs), params.getInitialBufferCapacity(), params.getMaxFrameSize());
     if (!transport.isOpen()) {
       transport.open();
     }
@@ -463,10 +470,9 @@ public class IoTDBConnection implements Connection {
         if (transport != null) {
           transport.close();
           openTransport();
-          if(Config.rpcThriftCompressionEnable) {
+          if (Config.rpcThriftCompressionEnable) {
             setClient(new TSIService.Client(new TCompactProtocol(transport)));
-          }
-          else {
+          } else {
             setClient(new TSIService.Client(new TBinaryProtocol(transport)));
           }
           openSession();
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java
index dfd1772..1bbb059 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java
@@ -27,6 +27,9 @@ public class IoTDBConnectionParams {
   private String username = Config.DEFAULT_USER;
   private String password = Config.DEFALUT_PASSWORD;
 
+  private int initialBufferCapacity = Config.DEFAULT_INITIAL_BUFFER_CAPACITY;
+  private int maxFrameSize = Config.DEFAULT_MAX_FRAME_SIZE;
+
   public IoTDBConnectionParams(String url) {
     this.jdbcUriString = url;
   }
@@ -79,4 +82,11 @@ public class IoTDBConnectionParams {
     this.password = password;
   }
 
+  public int getInitialBufferCapacity() {
+    return initialBufferCapacity;
+  }
+
+  public int getMaxFrameSize() {
+    return maxFrameSize;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
index fdee50c..cf8a744 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
@@ -53,6 +53,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -89,6 +90,8 @@ public class SyncClient implements ISyncClient {
 
   private static SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig();
 
+  private static final IoTDBConfig ioTDBConfig = IoTDBDescriptor.getInstance().getConfig();
+
   private static final int BATCH_LINE = 1000;
 
   private static final int TIMEOUT_MS = 1000;
@@ -233,9 +236,9 @@ public class SyncClient implements ISyncClient {
     syncSchema();
 
     // 3. Sync all data
-    String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+    String[] dataDirs = ioTDBConfig.getDataDirs();
     logger.info("There are {} data dirs to be synced.", dataDirs.length);
-    for (int i = 0 ; i < dataDirs.length; i++) {
+    for (int i = 0; i < dataDirs.length; i++) {
       String dataDir = dataDirs[i];
       logger.info("Start to sync data in data dir {}, the process is {}/{}", dataDir, i + 1,
           dataDirs.length);
@@ -274,9 +277,10 @@ public class SyncClient implements ISyncClient {
 
   @Override
   public void establishConnection(String serverIp, int serverPort) throws SyncConnectionException {
-    transport = new TFastFramedTransport(new TSocket(serverIp, serverPort, TIMEOUT_MS));
-    TProtocol protocol = null;
-    if (IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) {
+    transport = new TFastFramedTransport(new TSocket(serverIp, serverPort, TIMEOUT_MS),
+        ioTDBConfig.getThriftInitBufferSize(), ioTDBConfig.getThriftMaxFrameSize());
+    TProtocol protocol;
+    if (ioTDBConfig.isRpcThriftCompressionEnable()) {
       protocol = new TCompactProtocol(transport);
     } else {
       protocol = new TBinaryProtocol(transport);
@@ -296,10 +300,9 @@ public class SyncClient implements ISyncClient {
   public void confirmIdentity() throws SyncConnectionException {
     try (Socket socket = new Socket(config.getServerIp(), config.getServerPort())) {
       ConfirmInfo info = new ConfirmInfo(socket.getLocalAddress().getHostAddress(),
-          getOrCreateUUID(getUuidFile()),
-          IoTDBDescriptor.getInstance().getConfig().getPartitionInterval(), IoTDBConstant.VERSION);
-      SyncStatus status = serviceClient
-          .check(info);
+          getOrCreateUUID(getUuidFile()), ioTDBConfig.getPartitionInterval(),
+          IoTDBConstant.VERSION);
+      SyncStatus status = serviceClient.check(info);
       if (status.code != SUCCESS_CODE) {
         throw new SyncConnectionException(
             "The receiver rejected the synchronization task because " + status.msg);
@@ -432,14 +435,14 @@ public class SyncClient implements ISyncClient {
       if (syncSchemaLogFile.exists()) {
         try (BufferedReader br = new BufferedReader(new FileReader(syncSchemaLogFile))) {
           String pos = br.readLine();
-          if(pos != null) {
+          if (pos != null) {
             return Integer.parseInt(pos);
           }
         }
       }
     } catch (IOException e) {
       logger.error("Can not find file {}", syncSchemaLogFile.getAbsoluteFile(), e);
-    } catch (NumberFormatException e){
+    } catch (NumberFormatException e) {
       logger.error("Sync schema pos is not valid", e);
     }
     return 0;
@@ -514,8 +517,8 @@ public class SyncClient implements ISyncClient {
   }
 
   @Override
-  public void syncDeletedFilesNameInOneGroup(String sgName, Long timeRangeId, Set<File> deletedFilesName)
-      throws IOException {
+  public void syncDeletedFilesNameInOneGroup(String sgName, Long timeRangeId,
+      Set<File> deletedFilesName) throws IOException {
     if (deletedFilesName.isEmpty()) {
       logger.info("There has no deleted files to be synced in storage group {}", sgName);
       return;
@@ -615,7 +618,7 @@ public class SyncClient implements ISyncClient {
             ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
             bos.reset();
             SyncStatus status = serviceClient.syncData(buffToSend);
-            if(status.code == CONFLICT_CODE){
+            if (status.code == CONFLICT_CODE) {
               throw new SyncDeviceOwnerConflictException(status.msg);
             }
             if (status.code != SUCCESS_CODE) {
@@ -649,7 +652,7 @@ public class SyncClient implements ISyncClient {
     try (BufferedWriter bw = new BufferedWriter(new FileWriter(currentLocalFile))) {
       for (Map<Long, Set<File>> currentLocalFiles : lastLocalFilesMap.values()) {
         for (Set<File> files : currentLocalFiles.values()) {
-          for(File file: files) {
+          for (File file : files) {
             bw.write(file.getAbsolutePath());
             bw.newLine();
           }
@@ -677,23 +680,21 @@ public class SyncClient implements ISyncClient {
 
 
   private File getSchemaPosFile() {
-    return new File(IoTDBDescriptor.getInstance().getConfig().getSyncDir(),
+    return new File(ioTDBConfig.getSyncDir(),
         config.getSyncReceiverName() + File.separator + SyncConstant.SCHEMA_POS_FILE_NAME);
   }
 
   private File getSchemaLogFile() {
-    return new File(IoTDBDescriptor.getInstance().getConfig().getSchemaDir(),
-        MetadataConstant.METADATA_LOG);
+    return new File(ioTDBConfig.getSchemaDir(), MetadataConstant.METADATA_LOG);
   }
 
   private File getLockFile() {
-    return new File(IoTDBDescriptor.getInstance().getConfig().getSyncDir(),
+    return new File(ioTDBConfig.getSyncDir(),
         config.getSyncReceiverName() + File.separator + SyncConstant.LOCK_FILE_NAME);
   }
 
   private File getUuidFile() {
-    return new File(IoTDBDescriptor.getInstance().getConfig().getSyncDir(),
-        SyncConstant.UUID_FILE_NAME);
+    return new File(ioTDBConfig.getSyncDir(), SyncConstant.UUID_FILE_NAME);
   }
 
   private static class InstanceHolder {
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java b/session/src/main/java/org/apache/iotdb/session/Config.java
index b7cfb24..3d31eab 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/session/src/main/java/org/apache/iotdb/session/Config.java
@@ -26,4 +26,14 @@ public class Config {
   public static final int DEFAULT_TIMEOUT_MS = 0;
   public static final int RETRY_NUM = 3;
   public static final long RETRY_INTERVAL_MS = 1000;
+
+  /**
+   * thrift init buffer size, 1KB by default
+   */
+  public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
+
+  /**
+   * thrift max frame size (16384000 bytes by default), we change it to 64MB
+   */
+  public static final int DEFAULT_MAX_FRAME_SIZE = 67108864;
 }
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index ffb65ad..beb9056 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -76,6 +76,8 @@ public class Session {
   private int rpcPort;
   private String username;
   private String password;
+  private int initialBufferCapacity;
+  private int maxFrameSize;
   private TSIService.Iface client = null;
   private long sessionId;
   private TTransport transport;
@@ -87,32 +89,46 @@ public class Session {
   private int connectionTimeoutInMs;
 
   public Session(String host, int rpcPort) {
-    this(host, rpcPort, Config.DEFAULT_USER, Config.DEFAULT_PASSWORD, Config.DEFAULT_FETCH_SIZE, null);
+    this(host, rpcPort, Config.DEFAULT_USER, Config.DEFAULT_PASSWORD, Config.DEFAULT_FETCH_SIZE,
+        null, Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
   }
 
   public Session(String host, String rpcPort, String username, String password) {
-    this(host, Integer.parseInt(rpcPort), username, password, Config.DEFAULT_FETCH_SIZE, null);
+    this(host, Integer.parseInt(rpcPort), username, password, Config.DEFAULT_FETCH_SIZE, null,
+        Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
   }
 
   public Session(String host, int rpcPort, String username, String password) {
-    this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, null);
+    this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, null,
+        Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
   }
 
   public Session(String host, int rpcPort, String username, String password, int fetchSize) {
-    this(host, rpcPort, username, password, fetchSize, null);
+    this(host, rpcPort, username, password, fetchSize, null,
+        Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
   }
 
   public Session(String host, int rpcPort, String username, String password, ZoneId zoneId) {
-    this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, zoneId);
+    this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, zoneId,
+        Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
   }
 
-  public Session(String host, int rpcPort, String username, String password, int fetchSize, ZoneId zoneId) {
+  public Session(String host, int rpcPort, String username, String password, int fetchSize,
+      ZoneId zoneId) {
+    this(host, rpcPort, username, password, fetchSize, zoneId,
+        Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
+  }
+
+  public Session(String host, int rpcPort, String username, String password, int fetchSize,
+      ZoneId zoneId, int initialBufferCapacity, int maxFrameSize) {
     this.host = host;
     this.rpcPort = rpcPort;
     this.username = username;
     this.password = password;
     this.fetchSize = fetchSize;
     this.zoneId = zoneId;
+    this.initialBufferCapacity = initialBufferCapacity;
+    this.maxFrameSize = maxFrameSize;
   }
 
   public synchronized void open() throws IoTDBConnectionException {
@@ -132,7 +148,8 @@ public class Session {
     this.enableRPCCompression = enableRPCCompression;
     this.connectionTimeoutInMs = connectionTimeoutInMs;
 
-    transport = new TFastFramedTransport(new TSocket(host, rpcPort, connectionTimeoutInMs));
+    transport = new TFastFramedTransport(new TSocket(host, rpcPort, connectionTimeoutInMs),
+        initialBufferCapacity, maxFrameSize);
 
     if (!transport.isOpen()) {
       try {
@@ -393,8 +410,9 @@ public class Session {
       List<List<String>> measurementsList, List<List<TSDataType>> typesList,
       List<List<Object>> valuesList)
       throws IoTDBConnectionException, StatementExecutionException {
-     insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList, false);
+    insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList, false);
   }
+
   /**
    * Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
    * executeBatch, we pack some insert request in batch and send them to server. If you want improve
@@ -402,15 +420,15 @@ public class Session {
    * <p>
    * Each row is independent, which could have different deviceId, time, number of measurements
    *
-   * @param haveSorted  whether the times have been sorted
+   * @param haveSorted whether the times have been sorted
    * @see Session#insertTablet(Tablet)
    */
   public void insertRecordsOfOneDevice(String deviceId, List<Long> times,
       List<List<String>> measurementsList, List<List<TSDataType>> typesList,
       List<List<Object>> valuesList, boolean haveSorted)
       throws IoTDBConnectionException, StatementExecutionException {
-    TSInsertRecordsOfOneDeviceReq request = genTSInsertRecordsOfOneDeviceReq(deviceId, times, measurementsList,
-        typesList, valuesList, haveSorted);
+    TSInsertRecordsOfOneDeviceReq request = genTSInsertRecordsOfOneDeviceReq(deviceId, times,
+        measurementsList, typesList, valuesList, haveSorted);
     try {
       RpcUtils.verifySuccess(client.insertRecordsOfOneDevice(request));
     } catch (TException e) {
@@ -427,9 +445,10 @@ public class Session {
     }
   }
 
-  private TSInsertRecordsOfOneDeviceReq genTSInsertRecordsOfOneDeviceReq(String deviceId, List<Long> times,
-      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
-      List<List<Object>> valuesList, boolean haveSorted) throws IoTDBConnectionException, BatchExecutionException {
+  private TSInsertRecordsOfOneDeviceReq genTSInsertRecordsOfOneDeviceReq(String deviceId,
+      List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList, boolean haveSorted)
+      throws IoTDBConnectionException, BatchExecutionException {
     // check params size
     int len = times.size();
     if (len != measurementsList.size() || len != valuesList.size()) {
@@ -439,7 +458,8 @@ public class Session {
 
     if (haveSorted) {
       if (!checkSorted(times)) {
-        throw new BatchExecutionException("Times in InsertOneDeviceRecords are not in ascending order");
+        throw new BatchExecutionException(
+            "Times in InsertOneDeviceRecords are not in ascending order");
       }
     } else {
       //sort
@@ -478,8 +498,8 @@ public class Session {
     return Arrays.asList(result);
   }
 
-  private List<ByteBuffer> objectValuesListToByteBufferList(List<List<Object>> valuesList, List<List<TSDataType>> typesList)
-      throws IoTDBConnectionException {
+  private List<ByteBuffer> objectValuesListToByteBufferList(List<List<Object>> valuesList,
+      List<List<TSDataType>> typesList) throws IoTDBConnectionException {
     List<ByteBuffer> buffersList = new ArrayList<>();
     for (int i = 0; i < valuesList.size(); i++) {
       ByteBuffer buffer = ByteBuffer.allocate(calculateLength(typesList.get(i), valuesList.get(i)));