You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ne...@apache.org on 2021/05/16 03:10:26 UTC
[iotdb] branch master updated: [IOTDB-1259] upgrade libthrift from
0.12.0/0.13.0 to 0.14.1 and updated related classes or files (#3163)
This is an automated email from the ASF dual-hosted git repository.
neuyilan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5fe8c84 [IOTDB-1259] upgrade libthrift from 0.12.0/0.13.0 to 0.14.1 and updated related classes or files (#3163)
5fe8c84 is described below
commit 5fe8c8428f4e381f2a2dff5e1e04ad43d9bbbeac
Author: HouliangQi <ne...@163.com>
AuthorDate: Sun May 16 11:09:58 2021 +0800
[IOTDB-1259] upgrade libthrift from 0.12.0/0.13.0 to 0.14.1 and updated related classes or files (#3163)
Co-authored-by: 宋秉华 <so...@iie.ac.cn>
Co-authored-by: Xiangdong Huang <hx...@qq.com>
Co-authored-by: xiangdong huang <sa...@gmail.com>
---
.github/workflows/client.yml | 4 ++
LICENSE-binary | 2 +-
client-cpp/src/main/CMakeLists.txt | 2 +-
client-cpp/src/test/CMakeLists.txt | 2 +-
cluster/pom.xml | 2 +-
.../cluster/client/async/AsyncDataClient.java | 4 +-
.../client/async/AsyncDataHeartbeatClient.java | 4 +-
.../cluster/client/async/AsyncMetaClient.java | 4 +-
.../client/async/AsyncMetaHeartbeatClient.java | 4 +-
.../iotdb/cluster/client/sync/SyncDataClient.java | 2 +
.../client/sync/SyncDataHeartbeatClient.java | 2 +
.../iotdb/cluster/client/sync/SyncMetaClient.java | 2 +
.../client/sync/SyncMetaHeartbeatClient.java | 2 +
.../cluster/client/sync/SyncDataClientTest.java | 6 +-
.../cluster/client/sync/SyncMetaClientTest.java | 6 +-
.../cluster/common/TestAsyncClientFactory.java | 4 +-
.../cluster/common/TestSyncClientFactory.java | 13 +++++
.../cluster/log/snapshot/DataSnapshotTest.java | 14 +++++
.../cluster/log/snapshot/PullSnapshotTaskTest.java | 14 +++++
.../server/clusterinfo/ClusterInfoServerTest.java | 33 +++++++++--
compile-tools/thrift/pom.xml | 2 +-
jdbc/src/main/feature/feature.xml | 2 +-
.../org/apache/iotdb/jdbc/IoTDBConnection.java | 9 ++-
pom.xml | 10 ++--
.../org/apache/iotdb/db/service/TSServiceImpl.java | 3 +-
.../apache/iotdb/db/sync/conf/SyncConstant.java | 5 +-
.../iotdb/db/sync/sender/transfer/SyncClient.java | 27 +++++----
.../apache/iotdb/db/utils/EnvironmentUtils.java | 10 +++-
.../iotdb/rpc/AutoScalingBufferReadTransport.java | 13 +++++
.../iotdb/rpc/AutoScalingBufferWriteTransport.java | 15 +++++
.../org/apache/iotdb/rpc/RpcTransportFactory.java | 3 +-
.../rpc/TCompressedElasticFramedTransport.java | 2 +-
.../org/apache/iotdb/rpc/TConfigurationConst.java | 31 ++++++++++
.../apache/iotdb/rpc/TElasticFramedTransport.java | 20 ++++++-
.../iotdb/rpc/TNonblockingSocketWrapper.java | 60 +++++++++++++++++++
.../java/org/apache/iotdb/rpc/TSocketWrapper.java | 68 ++++++++++++++++++++++
.../apache/iotdb/session/SessionConnection.java | 13 +++--
.../apache/iotdb/session/pool/SessionPoolTest.java | 18 +++++-
session/src/test/resources/logback.xml | 40 +++++++++++++
39 files changed, 419 insertions(+), 58 deletions(-)
diff --git a/.github/workflows/client.yml b/.github/workflows/client.yml
index f6a7bad..927c4cd 100644
--- a/.github/workflows/client.yml
+++ b/.github/workflows/client.yml
@@ -95,6 +95,10 @@ jobs:
cd D:\a\cpp\boost_1_72_0 ; `
.\bootstrap.bat ; `
.\b2.exe
+ - name: Install OpenSSL
+ run: Invoke-WebRequest https://mirror.firedaemon.com/OpenSSL/openssl-1.1.1k.zip -OutFile D:\a\cpp\openssl-1.1.1k.zip ; `
+ Expand-Archive D:\a\cpp\openssl-1.1.1k.zip -DestinationPath D:\a\cpp ; `
+ [Environment]::SetEnvironmentVariable("Path", $env:Path + ";D:\a\cpp\openssl-1.1\x64\bin", "User") ; `
- name: Add Flex and Bison Path
shell: bash
run: cd /d/a/cpp && unzip win_flex_bison.zip && mv win_flex.exe flex.exe && mv win_bison.exe bison.exe && echo 'export PATH=/d/a/cpp:$PATH' >> ~/.bash_profile && source ~/.bash_profile
diff --git a/LICENSE-binary b/LICENSE-binary
index e7113af..971dfdf 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -245,7 +245,7 @@ net.minidev:json-smart:2.3
com.google.code.findbugs:jsr305:3.0.2
com.nimbusds:lang-tag:1.4.4
com.librato.metrics:librato-java:2.1.0
-org.apache.thrift:libthrift:0.13.0
+org.apache.thrift:libthrift:0.14.1
io.dropwizard.metrics:metrics-core:3.2.6
io.dropwizard.metrics:metrics-json:3.2.6
io.dropwizard.metrics:metrics-jvm:3.2.6
diff --git a/client-cpp/src/main/CMakeLists.txt b/client-cpp/src/main/CMakeLists.txt
index 6e4e468..44c5aa4 100644
--- a/client-cpp/src/main/CMakeLists.txt
+++ b/client-cpp/src/main/CMakeLists.txt
@@ -38,7 +38,7 @@ ENDIF()
# Add Boost include path for MacOS
INCLUDE_DIRECTORIES(/usr/local/include)
# Add Thrift include directory
-INCLUDE_DIRECTORIES(${TOOLS_DIR}/thrift/target/thrift-0.13.0/lib/cpp/src)
+INCLUDE_DIRECTORIES(${TOOLS_DIR}/thrift/target/thrift-0.14.1/lib/cpp/src)
# Add ./generated-sources-cpp as a Cmake subdirectory
AUX_SOURCE_DIRECTORY(./generated-sources-cpp SESSION_SRCS)
diff --git a/client-cpp/src/test/CMakeLists.txt b/client-cpp/src/test/CMakeLists.txt
index ba4ddfc..2ace790 100644
--- a/client-cpp/src/test/CMakeLists.txt
+++ b/client-cpp/src/test/CMakeLists.txt
@@ -29,7 +29,7 @@ INCLUDE_DIRECTORIES(/usr/local/include)
# Add Session related include files
INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/../main/generated-sources-cpp)
# Add Thrift include directory
-INCLUDE_DIRECTORIES(${TOOLS_DIR}/thrift/target/thrift-0.13.0/lib/cpp/src)
+INCLUDE_DIRECTORIES(${TOOLS_DIR}/thrift/target/thrift-0.14.1/lib/cpp/src)
find_package(Boost REQUIRED)
IF (DEFINED BOOST_INCLUDEDIR)
diff --git a/cluster/pom.xml b/cluster/pom.xml
index 96a4541..d143420 100644
--- a/cluster/pom.xml
+++ b/cluster/pom.xml
@@ -53,7 +53,7 @@
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
- <version>0.12.0</version>
+ <version>0.14.1</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
index f1c56bf..cf0cd1c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
@@ -24,11 +24,11 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.rpc.thrift.TSDataService.AsyncClient;
import org.apache.iotdb.cluster.server.RaftServer;
+import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.async.TAsyncMethodCall;
import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TNonblockingTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,7 +66,7 @@ public class AsyncDataClient extends AsyncClient {
super(
protocolFactory,
clientManager,
- new TNonblockingSocket(
+ TNonblockingSocketWrapper.wrap(
node.getInternalIp(), node.getDataPort(), RaftServer.getConnectionTimeoutInMS()));
this.node = node;
this.pool = pool;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataHeartbeatClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataHeartbeatClient.java
index 4d539a2..146c8b7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataHeartbeatClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataHeartbeatClient.java
@@ -23,10 +23,10 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TNonblockingSocket;
import java.io.IOException;
@@ -45,7 +45,7 @@ public class AsyncDataHeartbeatClient extends AsyncDataClient {
super(
protocolFactory,
clientManager,
- new TNonblockingSocket(
+ TNonblockingSocketWrapper.wrap(
node.getInternalIp(),
node.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
RaftServer.getConnectionTimeoutInMS()));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java
index d0bdde0..c615df0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java
@@ -23,11 +23,11 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncClient;
import org.apache.iotdb.cluster.server.RaftServer;
+import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.async.TAsyncMethodCall;
import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TNonblockingTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,7 +64,7 @@ public class AsyncMetaClient extends AsyncClient {
super(
protocolFactory,
clientManager,
- new TNonblockingSocket(
+ TNonblockingSocketWrapper.wrap(
node.getInternalIp(), node.getMetaPort(), RaftServer.getConnectionTimeoutInMS()));
this.node = node;
this.pool = pool;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaHeartbeatClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaHeartbeatClient.java
index 0a05ec3..babeae4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaHeartbeatClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaHeartbeatClient.java
@@ -23,10 +23,10 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TNonblockingSocket;
import java.io.IOException;
@@ -45,7 +45,7 @@ public class AsyncMetaHeartbeatClient extends AsyncMetaClient {
super(
protocolFactory,
clientManager,
- new TNonblockingSocket(
+ TNonblockingSocketWrapper.wrap(
node.getInternalIp(),
node.getMetaPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
RaftServer.getConnectionTimeoutInMS()));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
index 56bae98..cf92d14 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.cluster.rpc.thrift.TSDataService.Client;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.iotdb.rpc.TimeoutChangeableTransport;
import org.apache.thrift.protocol.TProtocol;
@@ -56,6 +57,7 @@ public class SyncDataClient extends Client implements Closeable {
protocolFactory.getProtocol(
RpcTransportFactory.INSTANCE.getTransport(
new TSocket(
+ TConfigurationConst.defaultTConfiguration,
node.getInternalIp(),
node.getDataPort(),
RaftServer.getConnectionTimeoutInMS()))));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
index 7b7cbdd..83603d4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TSocket;
@@ -41,6 +42,7 @@ public class SyncDataHeartbeatClient extends SyncDataClient {
protocolFactory.getProtocol(
RpcTransportFactory.INSTANCE.getTransport(
new TSocket(
+ TConfigurationConst.defaultTConfiguration,
node.getInternalIp(),
node.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
RaftServer.getConnectionTimeoutInMS()))));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
index 13b023c..d29e438 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Client;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
@@ -52,6 +53,7 @@ public class SyncMetaClient extends Client implements Closeable {
protocolFactory.getProtocol(
RpcTransportFactory.INSTANCE.getTransport(
new TSocket(
+ TConfigurationConst.defaultTConfiguration,
node.getInternalIp(),
node.getMetaPort(),
RaftServer.getConnectionTimeoutInMS()))));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
index f496491..7a06668 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TSocket;
@@ -41,6 +42,7 @@ public class SyncMetaHeartbeatClient extends SyncMetaClient {
protocolFactory.getProtocol(
RpcTransportFactory.INSTANCE.getTransport(
new TSocket(
+ TConfigurationConst.defaultTConfiguration,
node.getInternalIp(),
node.getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET,
RaftServer.getConnectionTimeoutInMS()))));
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataClientTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataClientTest.java
index f796fbf..0356b19 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataClientTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataClientTest.java
@@ -7,10 +7,10 @@ package org.apache.iotdb.cluster.client.sync;
import org.apache.iotdb.cluster.client.sync.SyncDataClient.FactorySync;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.rpc.TSocketWrapper;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
-import org.apache.thrift.transport.TSocket;
import org.junit.Test;
import java.io.IOException;
@@ -62,7 +62,7 @@ public class SyncDataClientTest {
client =
new SyncDataClient(
- new TBinaryProtocol(new TSocket(node.getInternalIp(), node.getDataPort())));
+ new TBinaryProtocol(TSocketWrapper.wrap(node.getInternalIp(), node.getDataPort())));
// client without a belong pool will be closed after putBack()
client.putBack();
assertFalse(client.getInputProtocol().getTransport().isOpen());
@@ -112,7 +112,7 @@ public class SyncDataClientTest {
try (SyncDataClient clientIn =
new SyncDataClient(
- new TBinaryProtocol(new TSocket(node.getInternalIp(), node.getDataPort())))) {
+ new TBinaryProtocol(TSocketWrapper.wrap(node.getInternalIp(), node.getDataPort())))) {
clientOut = clientIn;
}
// client without a belong pool will be closed after putBack()
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaClientTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaClientTest.java
index 00a367c..9a7a8ce 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaClientTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaClientTest.java
@@ -7,10 +7,10 @@ package org.apache.iotdb.cluster.client.sync;
import org.apache.iotdb.cluster.client.sync.SyncMetaClient.FactorySync;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.rpc.TSocketWrapper;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
-import org.apache.thrift.transport.TSocket;
import org.junit.Test;
import java.io.IOException;
@@ -54,7 +54,7 @@ public class SyncMetaClientTest {
client =
new SyncMetaClient(
- new TBinaryProtocol(new TSocket(node.getInternalIp(), node.getDataPort())));
+ new TBinaryProtocol(TSocketWrapper.wrap(node.getInternalIp(), node.getDataPort())));
// client without a belong pool will be closed after putBack()
client.putBack();
assertFalse(client.getInputProtocol().getTransport().isOpen());
@@ -99,7 +99,7 @@ public class SyncMetaClientTest {
try (SyncMetaClient clientIn =
new SyncMetaClient(
- new TBinaryProtocol(new TSocket(node.getInternalIp(), node.getDataPort())))) {
+ new TBinaryProtocol(TSocketWrapper.wrap(node.getInternalIp(), node.getDataPort())))) {
clientOut = clientIn;
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncClientFactory.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncClientFactory.java
index ebddafe..cad2cc6 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncClientFactory.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncClientFactory.java
@@ -23,11 +23,11 @@ import org.apache.iotdb.cluster.client.async.AsyncClientFactory;
import org.apache.iotdb.cluster.client.async.AsyncClientPool;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
+import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TNonblockingSocket;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -49,7 +49,7 @@ public class TestAsyncClientFactory extends AsyncClientFactory {
return new TestAsyncClient(
protocolFactory,
clientManager,
- new TNonblockingSocket(node.getInternalIp(), node.getMetaPort()),
+ TNonblockingSocketWrapper.wrap(node.getInternalIp(), node.getMetaPort()),
clientSerialNum.getAndIncrement());
}
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestSyncClientFactory.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestSyncClientFactory.java
index 86d09fc..eb15dd9 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestSyncClientFactory.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestSyncClientFactory.java
@@ -24,9 +24,11 @@ import org.apache.iotdb.cluster.client.sync.SyncClientPool;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.thrift.TConfiguration;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -66,6 +68,17 @@ public class TestSyncClientFactory implements SyncClientFactory {
public void write(byte[] bytes, int i, int i1) {
// do nothing
}
+
+ @Override
+ public TConfiguration getConfiguration() {
+ return null;
+ }
+
+ @Override
+ public void updateKnownMessageSize(long size) throws TTransportException {}
+
+ @Override
+ public void checkReadBytesAvailable(long numBytes) throws TTransportException {}
};
return new TestSyncClient(
protocolFactory.getProtocol(dummyTransport),
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
index 504fd6a..76ef24f 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
@@ -40,10 +40,12 @@ import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.thrift.TConfiguration;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
import org.junit.After;
import org.junit.Before;
@@ -139,6 +141,18 @@ public abstract class DataSnapshotTest {
@Override
public void write(byte[] bytes, int i, int i1) {}
+
+ @Override
+ public TConfiguration getConfiguration() {
+ return null;
+ }
+
+ @Override
+ public void updateKnownMessageSize(long size) throws TTransportException {}
+
+ @Override
+ public void checkReadBytesAvailable(long numBytes)
+ throws TTransportException {}
})) {
@Override
public ByteBuffer readFile(String filePath, long offset, int length)
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
index 993db9a..f4933ed 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
@@ -48,11 +48,13 @@ import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
+import org.apache.thrift.TConfiguration;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -172,6 +174,18 @@ public class PullSnapshotTaskTest extends DataSnapshotTest {
@Override
public void write(byte[] buf, int off, int len) {}
+
+ @Override
+ public TConfiguration getConfiguration() {
+ return null;
+ }
+
+ @Override
+ public void updateKnownMessageSize(long size) throws TTransportException {}
+
+ @Override
+ public void checkReadBytesAvailable(long numBytes)
+ throws TTransportException {}
});
}
};
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServerTest.java
index bdaf55c..d53748d 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServerTest.java
@@ -34,6 +34,7 @@ import org.junit.Before;
import org.junit.Test;
public class ClusterInfoServerTest {
+
ClusterInfoServiceImplTest test;
ClusterInfoServer service;
@@ -53,11 +54,17 @@ public class ClusterInfoServerTest {
@Test
public void testConnect() {
- TTransport transport =
- RpcTransportFactory.INSTANCE.getTransport(
- new TSocket(
- IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
- ClusterDescriptor.getInstance().getConfig().getClusterInfoRpcPort()));
+ TTransport transport = null;
+ try {
+ transport =
+ RpcTransportFactory.INSTANCE.getTransport(
+ new TSocket(
+ IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
+ ClusterDescriptor.getInstance().getConfig().getClusterInfoRpcPort()));
+ } catch (TTransportException e) {
+ Assert.fail(e.getMessage());
+ }
+
try {
transport.open();
} catch (TTransportException e) {
@@ -69,5 +76,21 @@ public class ClusterInfoServerTest {
Assert.assertNotNull(client);
// client's methods have been tested on ClusterInfoServiceImplTest
transport.close();
+ try {
+ transport =
+ RpcTransportFactory.INSTANCE.getTransport(
+ new TSocket(
+ IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
+ ClusterDescriptor.getInstance().getConfig().getClusterInfoRpcPort()));
+ transport.open();
+
+ // connection success means OK.
+ client = new ClusterInfoService.Client(new TBinaryProtocol(transport));
+ Assert.assertNotNull(client);
+ // client's methods have been tested on ClusterInfoServiceImplTest
+ transport.close();
+ } catch (TTransportException e) {
+ Assert.fail(e.getMessage());
+ }
}
}
diff --git a/compile-tools/thrift/pom.xml b/compile-tools/thrift/pom.xml
index 9087759..2b829f8 100644
--- a/compile-tools/thrift/pom.xml
+++ b/compile-tools/thrift/pom.xml
@@ -98,7 +98,7 @@
<goal>wget</goal>
</goals>
<configuration>
- <url>https://archive.apache.org/dist/thrift/${thrift.version}/thrift-${thrift.version}.zip</url>
+ <url>https://archive.apache.org/dist/thrift/${thrift.version}/thrift-${thrift.version}.tar.gz</url>
<unpack>true</unpack>
<outputDirectory>${project.build.directory}</outputDirectory>
</configuration>
diff --git a/jdbc/src/main/feature/feature.xml b/jdbc/src/main/feature/feature.xml
index 5704a7a..933a751 100644
--- a/jdbc/src/main/feature/feature.xml
+++ b/jdbc/src/main/feature/feature.xml
@@ -27,7 +27,7 @@
<bundle>mvn:org.apache.iotdb/service-rpc/${project.version}</bundle>
<bundle>mvn:org.apache.iotdb/iotdb-thrift/${project.version}</bundle>
<bundle>mvn:org.apache.iotdb/hadoop-tsfile/${project.version}</bundle>
- <bundle>mvn:org.apache.thrift/libthrift/0.13.0</bundle>
+ <bundle>mvn:org.apache.thrift/libthrift/0.14.1</bundle>
<bundle>mvn:org.xerial.snappy/snappy-java/1.1.7.2</bundle>
<bundle>mvn:commons-io/commons-io/2.5</bundle>
<bundle>wrap:mvn:org.apache.hadoop/hadoop-core/1.2.1</bundle>
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
index 8e01fd8..8a2bf7d 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.jdbc;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.iotdb.service.rpc.thrift.ServerProperties;
import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSIService;
@@ -30,6 +31,7 @@ import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.thrift.TConfiguration;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
@@ -73,6 +75,7 @@ public class IoTDBConnection implements Connection {
private boolean isClosed = true;
private SQLWarning warningChain = null;
private TTransport transport;
+ private TConfiguration tConfiguration = TConfigurationConst.defaultTConfiguration;
/**
* Timeout of query can be set by users. Unit: s If not set, default value 0 will be used, which
* will use server configuration.
@@ -438,7 +441,11 @@ public class IoTDBConnection implements Connection {
RpcTransportFactory.setThriftMaxFrameSize(params.getThriftMaxFrameSize());
transport =
RpcTransportFactory.INSTANCE.getTransport(
- new TSocket(params.getHost(), params.getPort(), Config.DEFAULT_CONNECTION_TIMEOUT_MS));
+ new TSocket(
+ tConfiguration,
+ params.getHost(),
+ params.getPort(),
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS));
if (!transport.isOpen()) {
transport.open();
}
diff --git a/pom.xml b/pom.xml
index aed72ae..a148d16 100644
--- a/pom.xml
+++ b/pom.xml
@@ -123,7 +123,7 @@
<common.io.version>2.5</common.io.version>
<commons.collections4>4.4</commons.collections4>
<!-- keep consistent with client-cpp/tools/thrift/pom.xml-->
- <thrift.version>0.13.0</thrift.version>
+ <thrift.version>0.14.1</thrift.version>
<airline.version>0.8</airline.version>
<jackson.version>2.11.0</jackson.version>
<antlr4.version>4.8-1</antlr4.version>
@@ -993,8 +993,8 @@
</activation>
<properties>
<os.classifier>linux-x86_64</os.classifier>
- <thrift.download-url>https://github.com/jt2594838/mvn-thrift-compiler/raw/master/thrift_0.12.0_0.13.0_linux.exe</thrift.download-url>
- <thrift.executable>thrift_0.12.0_0.13.0_linux.exe</thrift.executable>
+ <thrift.download-url>https://github.com/apache/iotdb-bin-resources/raw/main/compile-tools/thrift-0.14-ubuntu</thrift.download-url>
+ <thrift.executable>thrift_0.14.1_linux.exe</thrift.executable>
<thrift.skip-making-executable>false</thrift.skip-making-executable>
<thrift.exec-cmd.executable>chmod</thrift.exec-cmd.executable>
<thrift.exec-cmd.args>+x ${project.build.directory}/tools/${thrift.executable}</thrift.exec-cmd.args>
@@ -1009,8 +1009,8 @@
</activation>
<properties>
<os.classifier>mac-x86_64</os.classifier>
- <thrift.download-url>https://github.com/jt2594838/mvn-thrift-compiler/raw/master/thrift_0.12.0_0.13.0_mac.exe</thrift.download-url>
- <thrift.executable>thrift_0.12.0_0.13.0_mac.exe</thrift.executable>
+ <thrift.download-url>https://github.com/apache/iotdb-bin-resources/raw/main/compile-tools/thrift-0.14-MacOS</thrift.download-url>
+ <thrift.executable>thrift_0.14.1_mac.exe</thrift.executable>
<thrift.skip-making-executable>false</thrift.skip-making-executable>
<thrift.exec-cmd.executable>chmod</thrift.exec-cmd.executable>
<thrift.exec-cmd.args>+x ${project.build.directory}/tools/${thrift.executable}</thrift.exec-cmd.args>
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index f7b619a..15d2b35 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -135,7 +135,6 @@ import org.apache.iotdb.tsfile.utils.Pair;
import org.antlr.v4.runtime.misc.ParseCancellationException;
import org.apache.thrift.TException;
-import org.apache.thrift.server.ServerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -162,7 +161,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/** Thrift RPC implementation at server side. */
-public class TSServiceImpl implements TSIService.Iface, ServerContext {
+public class TSServiceImpl implements TSIService.Iface {
private static final Logger LOGGER = LoggerFactory.getLogger(TSServiceImpl.class);
private static final Logger SLOW_SQL_LOGGER = LoggerFactory.getLogger("SLOW_SQL");
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncConstant.java b/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncConstant.java
index 6485387..5d538ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncConstant.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.sync.conf;
+import org.apache.iotdb.rpc.RpcUtils;
+
public class SyncConstant {
private SyncConstant() {}
@@ -35,7 +37,8 @@ public class SyncConstant {
public static final String SYNC_DIR_NAME_SEPARATOR = "_";
/** Split data file, block size at each transmission */
- public static final int DATA_CHUNK_SIZE = 64 * 1024 * 1024;
+ public static final int DATA_CHUNK_SIZE =
+ Math.min(64 * 1024 * 1024, RpcUtils.THRIFT_FRAME_MAX_SIZE);
// sender section
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
index 9dcc807..8acd63e 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
@@ -37,16 +37,18 @@ import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogAnalyzer;
import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogger;
import org.apache.iotdb.db.utils.SyncUtils;
import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TSocketWrapper;
import org.apache.iotdb.service.sync.thrift.ConfirmInfo;
import org.apache.iotdb.service.sync.thrift.SyncService;
import org.apache.iotdb.service.sync.thrift.SyncStatus;
import org.apache.commons.io.FileUtils;
+import org.apache.thrift.TConfiguration;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
@@ -133,6 +135,8 @@ public class SyncClient implements ISyncClient {
init();
}
+ private TConfiguration tConfiguration = TConfigurationConst.defaultTConfiguration;
+
public static SyncClient getInstance() {
return InstanceHolder.INSTANCE;
}
@@ -292,17 +296,18 @@ public class SyncClient implements ISyncClient {
public void establishConnection(String serverIp, int serverPort) throws SyncConnectionException {
RpcTransportFactory.setDefaultBufferCapacity(ioTDBConfig.getThriftDefaultBufferSize());
RpcTransportFactory.setThriftMaxFrameSize(ioTDBConfig.getThriftMaxFrameSize());
- transport =
- RpcTransportFactory.INSTANCE.getTransport(new TSocket(serverIp, serverPort, TIMEOUT_MS));
- TProtocol protocol;
- if (ioTDBConfig.isRpcThriftCompressionEnable()) {
- protocol = new TCompactProtocol(transport);
- } else {
- protocol = new TBinaryProtocol(transport);
- }
-
- serviceClient = new SyncService.Client(protocol);
try {
+ transport =
+ RpcTransportFactory.INSTANCE.getTransport(
+ TSocketWrapper.wrap(tConfiguration, serverIp, serverPort, TIMEOUT_MS));
+ TProtocol protocol;
+ if (ioTDBConfig.isRpcThriftCompressionEnable()) {
+ protocol = new TCompactProtocol(transport);
+ } else {
+ protocol = new TBinaryProtocol(transport);
+ }
+ serviceClient = new SyncService.Client(protocol);
+
if (!transport.isOpen()) {
transport.open();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 63400e1..c35a9d8 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -40,9 +40,11 @@ import org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TSocketWrapper;
import org.apache.commons.io.FileUtils;
-import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.TConfiguration;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
@@ -77,6 +79,8 @@ public class EnvironmentUtils {
private static IoTDB daemon;
+ private static TConfiguration tConfiguration = TConfigurationConst.defaultTConfiguration;
+
public static boolean examinePorts =
Boolean.parseBoolean(System.getProperty("test.port.closed", "false"));
@@ -153,7 +157,7 @@ public class EnvironmentUtils {
}
private static boolean examinePorts() {
- TTransport transport = new TSocket("127.0.0.1", 6667, 100);
+ TTransport transport = TSocketWrapper.wrap(tConfiguration, "127.0.0.1", 6667, 100);
if (!transport.isOpen()) {
try {
transport.open();
@@ -165,7 +169,7 @@ public class EnvironmentUtils {
}
}
// try sync service
- transport = new TSocket("127.0.0.1", 5555, 100);
+ transport = TSocketWrapper.wrap(tConfiguration, "127.0.0.1", 5555, 100);
if (!transport.isOpen()) {
try {
transport.open();
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferReadTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferReadTransport.java
index 2bc7fbd..9687dfb 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferReadTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferReadTransport.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.rpc;
+import org.apache.thrift.TConfiguration;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
@@ -58,6 +59,18 @@ public class AutoScalingBufferReadTransport extends NonOpenTransport {
}
@Override
+ public TConfiguration getConfiguration() {
+ // should never call this method.
+ return null;
+ }
+
+ @Override
+ public void updateKnownMessageSize(long size) throws TTransportException {}
+
+ @Override
+ public void checkReadBytesAvailable(long numBytes) throws TTransportException {}
+
+ @Override
public final byte[] getBuffer() {
return buf.array();
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferWriteTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferWriteTransport.java
index f37a0af..410b0f0 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferWriteTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferWriteTransport.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.rpc;
+import org.apache.thrift.TConfiguration;
+import org.apache.thrift.transport.TTransportException;
+
/**
* Note that this class is mainly copied from class {@link
* org.apache.thrift.transport.AutoExpandingBufferWriteTransport}. since that class does not support
@@ -62,4 +65,16 @@ public class AutoScalingBufferWriteTransport extends NonOpenTransport {
public byte[] getBuffer() {
return buf.array();
}
+
+ @Override
+ public TConfiguration getConfiguration() {
+ // should never call this method.
+ return null;
+ }
+
+ @Override
+ public void updateKnownMessageSize(long size) throws TTransportException {}
+
+ @Override
+ public void checkReadBytesAvailable(long numBytes) throws TTransportException {}
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
index 7fd4307..1a70334 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.rpc;
import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
@SuppressWarnings("java:S1135") // ignore todos
@@ -50,7 +51,7 @@ public class RpcTransportFactory extends TTransportFactory {
}
@Override
- public TTransport getTransport(TTransport trans) {
+ public TTransport getTransport(TTransport trans) throws TTransportException {
return inner.getTransport(trans);
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
index 133fd95..74f2392 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
@@ -18,9 +18,9 @@
*/
package org.apache.iotdb.rpc;
-import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.layered.TFramedTransport;
import java.io.IOException;
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TConfigurationConst.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TConfigurationConst.java
new file mode 100644
index 0000000..d252093
--- /dev/null
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TConfigurationConst.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc;
+
+import org.apache.thrift.TConfiguration;
+
+public class TConfigurationConst {
+ // https://github.com/apache/thrift/blob/master/doc/specs/thrift-tconfiguration.md
+ public static TConfiguration defaultTConfiguration =
+ new TConfiguration(
+ RpcUtils.THRIFT_FRAME_MAX_SIZE + 4,
+ RpcUtils.THRIFT_FRAME_MAX_SIZE,
+ TConfiguration.DEFAULT_RECURSION_DEPTH);
+}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
index 7e32d51..036881d 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
@@ -18,11 +18,13 @@
*/
package org.apache.iotdb.rpc;
-import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.TConfiguration;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
+import org.apache.thrift.transport.layered.TFramedTransport;
+// https://github.com/apache/thrift/blob/master/doc/specs/thrift-rpc.md
public class TElasticFramedTransport extends TTransport {
public static class Factory extends TTransportFactory {
@@ -139,6 +141,22 @@ public class TElasticFramedTransport extends TTransport {
}
@Override
+ public TConfiguration getConfiguration() {
+ return underlying.getConfiguration();
+ }
+
+ @Override
+ public void updateKnownMessageSize(long size) throws TTransportException {
+ // do nothing now.
+ }
+
+ @Override
+ public void checkReadBytesAvailable(long numBytes) throws TTransportException {
+ // do nothing now.
+ // here we can do some checkm, e.g., see whether the memory is enough.
+ }
+
+ @Override
public void write(byte[] buf, int off, int len) {
writeBuffer.write(buf, off, len);
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSocketWrapper.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSocketWrapper.java
new file mode 100644
index 0000000..23fdefe
--- /dev/null
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSocketWrapper.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc;
+
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+
+/**
+ * In Thrift 0.14.1, TNonblockingSocket's constructor throws a never-happened exception. So, we
+ * screen the exception https://issues.apache.org/jira/browse/THRIFT-5412
+ */
+public class TNonblockingSocketWrapper {
+
+ public static TNonblockingSocket wrap(String host, int port) throws IOException {
+ try {
+ return new TNonblockingSocket(host, port);
+ } catch (TTransportException e) {
+ // never happen
+ return null;
+ }
+ }
+
+ public static TNonblockingSocket wrap(String host, int port, int timeout) throws IOException {
+ try {
+ return new TNonblockingSocket(host, port, timeout);
+ } catch (TTransportException e) {
+ // never happen
+ return null;
+ }
+ }
+
+ public static TNonblockingSocket wrap(SocketChannel socketChannel) throws IOException {
+ try {
+ return new TNonblockingSocket(socketChannel);
+ } catch (TTransportException e) {
+ // never happen
+ return null;
+ }
+ }
+}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSocketWrapper.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSocketWrapper.java
new file mode 100644
index 0000000..9ba3cc8
--- /dev/null
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSocketWrapper.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc;
+
+import org.apache.thrift.TConfiguration;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+ * In Thrift 0.14.1, TSocket's constructor throws a never-happened exception. So, we screen the
+ * exception https://issues.apache.org/jira/browse/THRIFT-5412
+ */
+public class TSocketWrapper {
+
+ public static TSocket wrap(TConfiguration config, String host, int port, int timeout) {
+ try {
+ return new TSocket(config, host, port, timeout);
+ } catch (TTransportException e) {
+ // never happen
+ return null;
+ }
+ }
+
+ public static TSocket wrap(
+ TConfiguration config, String host, int port, int socketTimeout, int connectTimeout) {
+ try {
+ return new TSocket(config, host, port, socketTimeout, connectTimeout);
+ } catch (TTransportException e) {
+ // never happen
+ return null;
+ }
+ }
+
+ public static TSocket wrap(TConfiguration config, String host, int port) {
+ try {
+ return new TSocket(config, host, port);
+ } catch (TTransportException e) {
+ // never happen
+ return null;
+ }
+ }
+
+ public static TSocket wrap(String host, int port) {
+ try {
+ return new TSocket(host, port);
+ } catch (TTransportException e) {
+ // never happen
+ return null;
+ }
+ }
+}
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 17e0a10..402523d 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
@@ -88,11 +89,15 @@ public class SessionConnection {
private void init(EndPoint endPoint) throws IoTDBConnectionException {
RpcTransportFactory.setDefaultBufferCapacity(session.thriftDefaultBufferSize);
RpcTransportFactory.setThriftMaxFrameSize(session.thriftMaxFrameSize);
- transport =
- RpcTransportFactory.INSTANCE.getTransport(
- new TSocket(endPoint.getIp(), endPoint.getPort(), session.connectionTimeoutInMs));
-
try {
+ transport =
+ RpcTransportFactory.INSTANCE.getTransport(
+ // as there is a try-catch already, we do not need to use TSocket.wrap
+ new TSocket(
+ TConfigurationConst.defaultTConfiguration,
+ endPoint.getIp(),
+ endPoint.getPort(),
+ session.connectionTimeoutInMs));
transport.open();
} catch (TTransportException e) {
throw new IoTDBConnectionException(e);
diff --git a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
index 28e2a73..b13b81b 100644
--- a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
+++ b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -224,7 +224,23 @@ public class SessionPoolTest {
pool.close();
return;
} catch (StatementExecutionException e) {
- fail("should be TTransportException but get an exception: " + e.getMessage());
+ // I do not why? the first call wrapper.hasNext() will cause InterruptedException and IoTDB
+ // warps
+ // it as StatementExecutionException, the second call can make sure that the thrift server's
+ // connection is closed.
+ try {
+ while (wrapper.hasNext()) {
+ wrapper.next();
+ }
+ } catch (IoTDBConnectionException ec) {
+ pool.closeResultSet(wrapper);
+ EnvironmentUtils.reactiveDaemon();
+ correctQuery(pool);
+ pool.close();
+ } catch (StatementExecutionException es) {
+ fail("should be TTransportException but get an exception: " + e.getMessage());
+ }
+ return;
}
fail("should throw exception but not");
}
diff --git a/session/src/test/resources/logback.xml b/session/src/test/resources/logback.xml
new file mode 100644
index 0000000..d14ffb7
--- /dev/null
+++ b/session/src/test/resources/logback.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration debug="false">
+ <property name="LOG_PATH" value="target/logs"/>
+ <!-- prevent logback from outputting its own status at the start of every log -->
+ <statusListener class="ch.qos.logback.core.status.NopStatusListener"/>
+ <appender class="ch.qos.logback.core.ConsoleAppender" name="stdout">
+ <Target>System.out</Target>
+ <encoder>
+ <pattern>%-5p [%d] [%thread] %C:%L - %m %n</pattern>
+ <charset>utf-8</charset>
+ </encoder>
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ <level>DEBUG</level>
+ </filter>
+ </appender>
+ <logger name="org.apache.iotdb.session" level="INFO"/>
+ <root level="WARN">
+ <appender-ref ref="stdout"/>
+ </root>
+</configuration>