You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2021/01/05 08:56:06 UTC
[iotdb] 01/01: [IOTDB-1103] Fix frame size larger than max length
error
This is an automated email from the ASF dual-hosted git repository.
sunzesong pushed a commit to branch jira-1103
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9baba06efc1f5b7838735721909e01dbcdab5ec2
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Tue Jan 5 16:55:25 2021 +0800
[IOTDB-1103] Fix frame size larger than max length error
---
.../main/java/org/apache/iotdb/jdbc/Config.java | 9 ++++
.../org/apache/iotdb/jdbc/IoTDBConnection.java | 24 ++++++----
.../apache/iotdb/jdbc/IoTDBConnectionParams.java | 10 ++++
.../iotdb/db/sync/sender/transfer/SyncClient.java | 43 ++++++++---------
.../main/java/org/apache/iotdb/session/Config.java | 10 ++++
.../java/org/apache/iotdb/session/Session.java | 54 +++++++++++++++-------
6 files changed, 103 insertions(+), 47 deletions(-)
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
index 9025607..457f732 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
@@ -54,4 +54,13 @@ public class Config {
public static boolean rpcThriftCompressionEnable = false;
+ /**
+ * thrift init buffer size, 1KB by default
+ */
+ public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
+
+ /**
+ * thrift max frame size (16384000 bytes by default), we change it to 64MB
+ */
+ public static final int DEFAULT_MAX_FRAME_SIZE = 67108864;
}
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
index 85464cd..5a0578d 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
@@ -40,7 +40,14 @@ import java.util.Properties;
import java.util.concurrent.Executor;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.service.rpc.thrift.*;
+import org.apache.iotdb.service.rpc.thrift.ServerProperties;
+import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
+import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
@@ -78,10 +85,9 @@ public class IoTDBConnection implements Connection {
params = Utils.parseUrl(url, info);
openTransport();
- if(Config.rpcThriftCompressionEnable) {
+ if (Config.rpcThriftCompressionEnable) {
setClient(new TSIService.Client(new TCompactProtocol(transport)));
- }
- else {
+ } else {
setClient(new TSIService.Client(new TBinaryProtocol(transport)));
}
// open client session
@@ -120,7 +126,8 @@ public class IoTDBConnection implements Connection {
try {
getClient().closeSession(req);
} catch (TException e) {
- throw new SQLException("Error occurs when closing session at server. Maybe server is down.", e);
+ throw new SQLException("Error occurs when closing session at server. Maybe server is down.",
+ e);
} finally {
isClosed = true;
if (transport != null) {
@@ -408,7 +415,7 @@ public class IoTDBConnection implements Connection {
private void openTransport() throws TTransportException {
transport = new TFastFramedTransport(new TSocket(params.getHost(), params.getPort(),
- Config.connectionTimeoutInMs));
+ Config.connectionTimeoutInMs), params.getInitialBufferCapacity(), params.getMaxFrameSize());
if (!transport.isOpen()) {
transport.open();
}
@@ -463,10 +470,9 @@ public class IoTDBConnection implements Connection {
if (transport != null) {
transport.close();
openTransport();
- if(Config.rpcThriftCompressionEnable) {
+ if (Config.rpcThriftCompressionEnable) {
setClient(new TSIService.Client(new TCompactProtocol(transport)));
- }
- else {
+ } else {
setClient(new TSIService.Client(new TBinaryProtocol(transport)));
}
openSession();
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java
index dfd1772..1bbb059 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java
@@ -27,6 +27,9 @@ public class IoTDBConnectionParams {
private String username = Config.DEFAULT_USER;
private String password = Config.DEFALUT_PASSWORD;
+ private int initialBufferCapacity = Config.DEFAULT_INITIAL_BUFFER_CAPACITY;
+ private int maxFrameSize = Config.DEFAULT_MAX_FRAME_SIZE;
+
public IoTDBConnectionParams(String url) {
this.jdbcUriString = url;
}
@@ -79,4 +82,11 @@ public class IoTDBConnectionParams {
this.password = password;
}
+ public int getInitialBufferCapacity() {
+ return initialBufferCapacity;
+ }
+
+ public int getMaxFrameSize() {
+ return maxFrameSize;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
index fdee50c..cf8a744 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
@@ -53,6 +53,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -89,6 +90,8 @@ public class SyncClient implements ISyncClient {
private static SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig();
+ private static final IoTDBConfig ioTDBConfig = IoTDBDescriptor.getInstance().getConfig();
+
private static final int BATCH_LINE = 1000;
private static final int TIMEOUT_MS = 1000;
@@ -233,9 +236,9 @@ public class SyncClient implements ISyncClient {
syncSchema();
// 3. Sync all data
- String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+ String[] dataDirs = ioTDBConfig.getDataDirs();
logger.info("There are {} data dirs to be synced.", dataDirs.length);
- for (int i = 0 ; i < dataDirs.length; i++) {
+ for (int i = 0; i < dataDirs.length; i++) {
String dataDir = dataDirs[i];
logger.info("Start to sync data in data dir {}, the process is {}/{}", dataDir, i + 1,
dataDirs.length);
@@ -274,9 +277,10 @@ public class SyncClient implements ISyncClient {
@Override
public void establishConnection(String serverIp, int serverPort) throws SyncConnectionException {
- transport = new TFastFramedTransport(new TSocket(serverIp, serverPort, TIMEOUT_MS));
- TProtocol protocol = null;
- if (IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) {
+ transport = new TFastFramedTransport(new TSocket(serverIp, serverPort, TIMEOUT_MS),
+ ioTDBConfig.getThriftInitBufferSize(), ioTDBConfig.getThriftMaxFrameSize());
+ TProtocol protocol;
+ if (ioTDBConfig.isRpcThriftCompressionEnable()) {
protocol = new TCompactProtocol(transport);
} else {
protocol = new TBinaryProtocol(transport);
@@ -296,10 +300,9 @@ public class SyncClient implements ISyncClient {
public void confirmIdentity() throws SyncConnectionException {
try (Socket socket = new Socket(config.getServerIp(), config.getServerPort())) {
ConfirmInfo info = new ConfirmInfo(socket.getLocalAddress().getHostAddress(),
- getOrCreateUUID(getUuidFile()),
- IoTDBDescriptor.getInstance().getConfig().getPartitionInterval(), IoTDBConstant.VERSION);
- SyncStatus status = serviceClient
- .check(info);
+ getOrCreateUUID(getUuidFile()), ioTDBConfig.getPartitionInterval(),
+ IoTDBConstant.VERSION);
+ SyncStatus status = serviceClient.check(info);
if (status.code != SUCCESS_CODE) {
throw new SyncConnectionException(
"The receiver rejected the synchronization task because " + status.msg);
@@ -432,14 +435,14 @@ public class SyncClient implements ISyncClient {
if (syncSchemaLogFile.exists()) {
try (BufferedReader br = new BufferedReader(new FileReader(syncSchemaLogFile))) {
String pos = br.readLine();
- if(pos != null) {
+ if (pos != null) {
return Integer.parseInt(pos);
}
}
}
} catch (IOException e) {
logger.error("Can not find file {}", syncSchemaLogFile.getAbsoluteFile(), e);
- } catch (NumberFormatException e){
+ } catch (NumberFormatException e) {
logger.error("Sync schema pos is not valid", e);
}
return 0;
@@ -514,8 +517,8 @@ public class SyncClient implements ISyncClient {
}
@Override
- public void syncDeletedFilesNameInOneGroup(String sgName, Long timeRangeId, Set<File> deletedFilesName)
- throws IOException {
+ public void syncDeletedFilesNameInOneGroup(String sgName, Long timeRangeId,
+ Set<File> deletedFilesName) throws IOException {
if (deletedFilesName.isEmpty()) {
logger.info("There has no deleted files to be synced in storage group {}", sgName);
return;
@@ -615,7 +618,7 @@ public class SyncClient implements ISyncClient {
ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
bos.reset();
SyncStatus status = serviceClient.syncData(buffToSend);
- if(status.code == CONFLICT_CODE){
+ if (status.code == CONFLICT_CODE) {
throw new SyncDeviceOwnerConflictException(status.msg);
}
if (status.code != SUCCESS_CODE) {
@@ -649,7 +652,7 @@ public class SyncClient implements ISyncClient {
try (BufferedWriter bw = new BufferedWriter(new FileWriter(currentLocalFile))) {
for (Map<Long, Set<File>> currentLocalFiles : lastLocalFilesMap.values()) {
for (Set<File> files : currentLocalFiles.values()) {
- for(File file: files) {
+ for (File file : files) {
bw.write(file.getAbsolutePath());
bw.newLine();
}
@@ -677,23 +680,21 @@ public class SyncClient implements ISyncClient {
private File getSchemaPosFile() {
- return new File(IoTDBDescriptor.getInstance().getConfig().getSyncDir(),
+ return new File(ioTDBConfig.getSyncDir(),
config.getSyncReceiverName() + File.separator + SyncConstant.SCHEMA_POS_FILE_NAME);
}
private File getSchemaLogFile() {
- return new File(IoTDBDescriptor.getInstance().getConfig().getSchemaDir(),
- MetadataConstant.METADATA_LOG);
+ return new File(ioTDBConfig.getSchemaDir(), MetadataConstant.METADATA_LOG);
}
private File getLockFile() {
- return new File(IoTDBDescriptor.getInstance().getConfig().getSyncDir(),
+ return new File(ioTDBConfig.getSyncDir(),
config.getSyncReceiverName() + File.separator + SyncConstant.LOCK_FILE_NAME);
}
private File getUuidFile() {
- return new File(IoTDBDescriptor.getInstance().getConfig().getSyncDir(),
- SyncConstant.UUID_FILE_NAME);
+ return new File(ioTDBConfig.getSyncDir(), SyncConstant.UUID_FILE_NAME);
}
private static class InstanceHolder {
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java b/session/src/main/java/org/apache/iotdb/session/Config.java
index b7cfb24..3d31eab 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/session/src/main/java/org/apache/iotdb/session/Config.java
@@ -26,4 +26,14 @@ public class Config {
public static final int DEFAULT_TIMEOUT_MS = 0;
public static final int RETRY_NUM = 3;
public static final long RETRY_INTERVAL_MS = 1000;
+
+ /**
+ * thrift init buffer size, 1KB by default
+ */
+ public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
+
+ /**
+ * thrift max frame size (16384000 bytes by default), we change it to 64MB
+ */
+ public static final int DEFAULT_MAX_FRAME_SIZE = 67108864;
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index ffb65ad..beb9056 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -76,6 +76,8 @@ public class Session {
private int rpcPort;
private String username;
private String password;
+ private int initialBufferCapacity;
+ private int maxFrameSize;
private TSIService.Iface client = null;
private long sessionId;
private TTransport transport;
@@ -87,32 +89,46 @@ public class Session {
private int connectionTimeoutInMs;
public Session(String host, int rpcPort) {
- this(host, rpcPort, Config.DEFAULT_USER, Config.DEFAULT_PASSWORD, Config.DEFAULT_FETCH_SIZE, null);
+ this(host, rpcPort, Config.DEFAULT_USER, Config.DEFAULT_PASSWORD, Config.DEFAULT_FETCH_SIZE,
+ null, Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
}
public Session(String host, String rpcPort, String username, String password) {
- this(host, Integer.parseInt(rpcPort), username, password, Config.DEFAULT_FETCH_SIZE, null);
+ this(host, Integer.parseInt(rpcPort), username, password, Config.DEFAULT_FETCH_SIZE, null,
+ Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
}
public Session(String host, int rpcPort, String username, String password) {
- this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, null);
+ this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, null,
+ Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
}
public Session(String host, int rpcPort, String username, String password, int fetchSize) {
- this(host, rpcPort, username, password, fetchSize, null);
+ this(host, rpcPort, username, password, fetchSize, null,
+ Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
}
public Session(String host, int rpcPort, String username, String password, ZoneId zoneId) {
- this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, zoneId);
+ this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, zoneId,
+ Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
}
- public Session(String host, int rpcPort, String username, String password, int fetchSize, ZoneId zoneId) {
+ public Session(String host, int rpcPort, String username, String password, int fetchSize,
+ ZoneId zoneId) {
+ this(host, rpcPort, username, password, fetchSize, zoneId,
+ Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE);
+ }
+
+ public Session(String host, int rpcPort, String username, String password, int fetchSize,
+ ZoneId zoneId, int initialBufferCapacity, int maxFrameSize) {
this.host = host;
this.rpcPort = rpcPort;
this.username = username;
this.password = password;
this.fetchSize = fetchSize;
this.zoneId = zoneId;
+ this.initialBufferCapacity = initialBufferCapacity;
+ this.maxFrameSize = maxFrameSize;
}
public synchronized void open() throws IoTDBConnectionException {
@@ -132,7 +148,8 @@ public class Session {
this.enableRPCCompression = enableRPCCompression;
this.connectionTimeoutInMs = connectionTimeoutInMs;
- transport = new TFastFramedTransport(new TSocket(host, rpcPort, connectionTimeoutInMs));
+ transport = new TFastFramedTransport(new TSocket(host, rpcPort, connectionTimeoutInMs),
+ initialBufferCapacity, maxFrameSize);
if (!transport.isOpen()) {
try {
@@ -393,8 +410,9 @@ public class Session {
List<List<String>> measurementsList, List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
- insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList, false);
+ insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList, false);
}
+
/**
* Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc
* executeBatch, we pack some insert request in batch and send them to server. If you want improve
@@ -402,15 +420,15 @@ public class Session {
* <p>
* Each row is independent, which could have different deviceId, time, number of measurements
*
- * @param haveSorted whether the times have been sorted
+ * @param haveSorted whether the times have been sorted
* @see Session#insertTablet(Tablet)
*/
public void insertRecordsOfOneDevice(String deviceId, List<Long> times,
List<List<String>> measurementsList, List<List<TSDataType>> typesList,
List<List<Object>> valuesList, boolean haveSorted)
throws IoTDBConnectionException, StatementExecutionException {
- TSInsertRecordsOfOneDeviceReq request = genTSInsertRecordsOfOneDeviceReq(deviceId, times, measurementsList,
- typesList, valuesList, haveSorted);
+ TSInsertRecordsOfOneDeviceReq request = genTSInsertRecordsOfOneDeviceReq(deviceId, times,
+ measurementsList, typesList, valuesList, haveSorted);
try {
RpcUtils.verifySuccess(client.insertRecordsOfOneDevice(request));
} catch (TException e) {
@@ -427,9 +445,10 @@ public class Session {
}
}
- private TSInsertRecordsOfOneDeviceReq genTSInsertRecordsOfOneDeviceReq(String deviceId, List<Long> times,
- List<List<String>> measurementsList, List<List<TSDataType>> typesList,
- List<List<Object>> valuesList, boolean haveSorted) throws IoTDBConnectionException, BatchExecutionException {
+ private TSInsertRecordsOfOneDeviceReq genTSInsertRecordsOfOneDeviceReq(String deviceId,
+ List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+ List<List<Object>> valuesList, boolean haveSorted)
+ throws IoTDBConnectionException, BatchExecutionException {
// check params size
int len = times.size();
if (len != measurementsList.size() || len != valuesList.size()) {
@@ -439,7 +458,8 @@ public class Session {
if (haveSorted) {
if (!checkSorted(times)) {
- throw new BatchExecutionException("Times in InsertOneDeviceRecords are not in ascending order");
+ throw new BatchExecutionException(
+ "Times in InsertOneDeviceRecords are not in ascending order");
}
} else {
//sort
@@ -478,8 +498,8 @@ public class Session {
return Arrays.asList(result);
}
- private List<ByteBuffer> objectValuesListToByteBufferList(List<List<Object>> valuesList, List<List<TSDataType>> typesList)
- throws IoTDBConnectionException {
+ private List<ByteBuffer> objectValuesListToByteBufferList(List<List<Object>> valuesList,
+ List<List<TSDataType>> typesList) throws IoTDBConnectionException {
List<ByteBuffer> buffersList = new ArrayList<>();
for (int i = 0; i < valuesList.size(); i++) {
ByteBuffer buffer = ByteBuffer.allocate(calculateLength(typesList.get(i), valuesList.get(i)));