You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/10/27 03:48:04 UTC
[iotdb] branch master updated: [IOTDB-4679] Unable to connect to iotdb in private-network using port-mapping (#7721)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new dff3d3727e [IOTDB-4679] Unable to connect to iotdb in private-network using port-mapping (#7721)
dff3d3727e is described below
commit dff3d3727e485700951b5ae3170cdc8f74df28a4
Author: Mrquan <50...@users.noreply.github.com>
AuthorDate: Thu Oct 27 11:48:00 2022 +0800
[IOTDB-4679] Unable to connect to iotdb in private-network using port-mapping (#7721)
---
docs/UserGuide/API/Interface-Comparison.md | 44 ++--
docs/UserGuide/API/Programming-Java-Native-API.md | 2 +-
docs/zh/UserGuide/API/Interface-Comparison.md | 44 ++--
.../UserGuide/API/Programming-Java-Native-API.md | 2 +-
.../java/org/apache/iotdb/itbase/env/BaseEnv.java | 8 +-
.../org/apache/iotdb/db/it/env/StandaloneEnv.java | 4 +-
.../apache/iotdb/session/pool/SessionPoolTest.java | 4 +-
.../resources/conf/iotdb-datanode.properties | 1 +
.../java/org/apache/iotdb/db/service/DataNode.java | 4 -
.../java/org/apache/iotdb/session/ISession.java | 4 +-
.../java/org/apache/iotdb/session/Session.java | 258 ++++++++++++++-------
.../org/apache/iotdb/session/SessionConfig.java | 2 +-
.../org/apache/iotdb/session/pool/SessionPool.java | 48 ++--
.../apache/iotdb/session/SessionCacheLeaderUT.java | 4 +-
.../java/org/apache/iotdb/db/sql/ClusterIT.java | 2 +-
15 files changed, 262 insertions(+), 169 deletions(-)
diff --git a/docs/UserGuide/API/Interface-Comparison.md b/docs/UserGuide/API/Interface-Comparison.md
index 14761497af..ec7f287bdb 100644
--- a/docs/UserGuide/API/Interface-Comparison.md
+++ b/docs/UserGuide/API/Interface-Comparison.md
@@ -25,26 +25,26 @@ This chapter mainly compares the differences between Java Native API and python
-| Order | API name and function | Java API | Python API | <div style="width: 200pt">API Comparison</div> |
-| ----- | ------------------------------------- | ------------------------------------------------------------ | :----------------------------------------------------------- | ------------------------------------------------------------ |
-| 1 | Initialize session | `Session.Builder.build(); Session.Builder().host(String host).port(int port).build(); Session.Builder().nodeUrls(List<String> nodeUrls).build(); Session.Builder().fetchSize(int fetchSize).username(String username).password(String password).thriftDefaultBufferSize(int thriftDefaultBufferSize).thriftMaxFrameSize(int thriftMaxFrameSize).enableCacheLeader(boolean enableCacheLeader).version(Version version).build();` | `Session [...]
-| 2 | Open session | `void open() void open(boolean enableRPCCompression)` | `session.open(enable_rpc_compression=False)` | |
-| 3 | Close session | `void close()` | `session.close()` | |
-| 4 | Set storage group | `void setStorageGroup(String storageGroupId)` | `session.set_storage_group(group_name)` | |
-| 5 | Delete storage group | `void deleteStorageGroup(String storageGroup) void deleteStorageGroups(List<String> storageGroups)` | `session.delete_storage_group(group_name) session.delete_storage_groups(group_name_lst)` | |
-| 6 | Create timeseries | `void createTimeseries(String path, TSDataType dataType,TSEncoding encoding, CompressionType compressor, Map<String, String> props,Map<String, String> tags, Map<String, String> attributes, String measurementAlias) void createMultiTimeseries(List<String> paths, List<TSDataType> dataTypes,List<TSEncoding> encodings, List<CompressionType> compressors,List<Map<String, String>> propsList, List<Map<String, String>> tagsList,Li [...]
-| 7 | Create aligned timeseries | `void createAlignedTimeseries(String prefixPath, List<String> measurements,List<TSDataType> dataTypes, List<TSEncoding> encodings,CompressionType compressor, List<String> measurementAliasList);` | `session.create_aligned_time_series(device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst)` | |
-| 8 | Delete timeseries | `void deleteTimeseries(String path) void deleteTimeseries(List<String> paths)` | `session.delete_time_series(paths_list)` | Python native API is missing an API to delete a time series |
-| 9 | Detect whether the timeseries exists | `boolean checkTimeseriesExists(String path)` | `session.check_time_series_exists(path)` | |
-| 10 | Metadata template | `public void createSchemaTemplate(Template template);` | | |
-| 11 | Insert tablet | `void insertTablet(Tablet tablet) void insertTablets(Map<String, Tablet> tablets)` | `session.insert_tablet(tablet_) session.insert_tablets(tablet_lst)` | |
-| 12 | Insert record | `void insertRecord(String prefixPath, long time, List<String> measurements,List<TSDataType> types, List<Object> values) void insertRecords(List<String> deviceIds,List<Long> times,List<List<String>> measurementsList,List<List<TSDataType>> typesList,List<List<Object>> valuesList) void insertRecordsOfOneDevice(String deviceId, List<Long> times,List<List<Object>> valuesList)` | `session.insert_record(device_id, timestamp, measur [...]
-| 13 | Write with type inference | `void insertRecord(String prefixPath, long time, List<String> measurements, List<String> values) void insertRecords(List<String> deviceIds, List<Long> times,List<List<String>> measurementsList, List<List<String>> valuesList) void insertStringRecordsOfOneDevice(String deviceId, List<Long> times,List<List<String>> measurementsList, List<List<String>> valuesList)` | `session.insert_str_record(device_id, timestamp, measurements [...]
-| 14 | Write of aligned time series | `insertAlignedRecord insertAlignedRecords insertAlignedRecordsOfOneDevice insertAlignedStringRecordsOfOneDevice insertAlignedTablet insertAlignedTablets` | `insert_aligned_record insert_aligned_records insert_aligned_records_of_one_device insert_aligned_tablet insert_aligned_tablets` | Python native API is missing the writing of aligned time series with judgment type |
-| 15 | Data deletion | `void deleteData(String path, long endTime) void deleteData(List<String> paths, long endTime)` | | 1. The python native API lacks an API to delete a piece of data<br/>2. The python native API lacks an API to delete multiple pieces of data |
-| 16 | Data query | `SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime) SessionDataSet executeLastDataQuery(List<String> paths, long LastTime)` | | 1. The python native API lacks an API for querying the original data<br/>2. The python native API lacks an API to query the data whose last timestamp is greater than or equal to a certain time point |
-| 17 | Iotdb SQL API - query statement | `SessionDataSet executeQueryStatement(String sql)` | `session.execute_query_statement(sql)` | |
-| 18 | Iotdb SQL API - non query statement | `void executeNonQueryStatement(String sql)` | `session.execute_non_query_statement(sql)` | |
+| Order | API name and function | Java API [...]
+| ----- | ------------------------------------- |----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
+| 1 | Initialize session | `Session.Builder.build(); Session.Builder().host(String host).port(int port).build(); Session.Builder().nodeUrls(List<String> nodeUrls).build(); Session.Builder().fetchSize(int fetchSize).username(String username).password(String password).thriftDefaultBufferSize(int thriftDefaultBufferSize).thriftMaxFrameSize(int thriftMaxFrameSize).enableRedirection(boolean enableCacheLeader).version(Version version).build();` [...]
+| 2 | Open session | `void open() void open(boolean enableRPCCompression)` [...]
+| 3 | Close session | `void close()` [...]
+| 4 | Set storage group | `void setStorageGroup(String storageGroupId)` [...]
+| 5 | Delete storage group | `void deleteStorageGroup(String storageGroup) void deleteStorageGroups(List<String> storageGroups)` [...]
+| 6 | Create timeseries | `void createTimeseries(String path, TSDataType dataType,TSEncoding encoding, CompressionType compressor, Map<String, String> props,Map<String, String> tags, Map<String, String> attributes, String measurementAlias) void createMultiTimeseries(List<String> paths, List<TSDataType> dataTypes,List<TSEncoding> encodings, List<CompressionType> compressors,List<Map<String, String>> propsList, List<Map<String, String>> tagsList,Li [...]
+| 7 | Create aligned timeseries | `void createAlignedTimeseries(String prefixPath, List<String> measurements,List<TSDataType> dataTypes, List<TSEncoding> encodings,CompressionType compressor, List<String> measurementAliasList);` [...]
+| 8 | Delete timeseries | `void deleteTimeseries(String path) void deleteTimeseries(List<String> paths)` [...]
+| 9 | Detect whether the timeseries exists | `boolean checkTimeseriesExists(String path)` [...]
+| 10 | Metadata template | `public void createSchemaTemplate(Template template);` [...]
+| 11 | Insert tablet | `void insertTablet(Tablet tablet) void insertTablets(Map<String, Tablet> tablets)` [...]
+| 12 | Insert record | `void insertRecord(String prefixPath, long time, List<String> measurements,List<TSDataType> types, List<Object> values) void insertRecords(List<String> deviceIds,List<Long> times,List<List<String>> measurementsList,List<List<TSDataType>> typesList,List<List<Object>> valuesList) void insertRecordsOfOneDevice(String deviceId, List<Long> times,List<List<Object>> valuesList)` [...]
+| 13 | Write with type inference | `void insertRecord(String prefixPath, long time, List<String> measurements, List<String> values) void insertRecords(List<String> deviceIds, List<Long> times,List<List<String>> measurementsList, List<List<String>> valuesList) void insertStringRecordsOfOneDevice(String deviceId, List<Long> times,List<List<String>> measurementsList, List<List<String>> valuesList)` [...]
+| 14 | Write of aligned time series | `insertAlignedRecord insertAlignedRecords insertAlignedRecordsOfOneDevice insertAlignedStringRecordsOfOneDevice insertAlignedTablet insertAlignedTablets` [...]
+| 15 | Data deletion | `void deleteData(String path, long endTime) void deleteData(List<String> paths, long endTime)` [...]
+| 16 | Data query | `SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime) SessionDataSet executeLastDataQuery(List<String> paths, long LastTime)` [...]
+| 17 | Iotdb SQL API - query statement | `SessionDataSet executeQueryStatement(String sql)` [...]
+| 18 | Iotdb SQL API - non query statement | `void executeNonQueryStatement(String sql)` [...]
| 19 | Test API | `void testInsertRecord(String deviceId, long time, List<String> measurements, List<String> values) void testInsertRecord(String deviceId, long time, List<String> measurements,List<TSDataType> types, List<Object> values) void testInsertRecords(List<String> deviceIds, List<Long> times,List<List<String>> measurementsList, List<List<String>> valuesList) void testInsertRecords(List<String> deviceIds, List<Long> times,List<List [...]
-| 20 | Connection pool for native interfaces | `SessionPool` | | Python API has no connection pool for native API |
-| 21 | API related to cluster information | `iotdb-thrift-cluster` | | Python API does not support interfaces related to cluster information |
\ No newline at end of file
+| 20 | Connection pool for native interfaces | `SessionPool` [...]
+| 21 | API related to cluster information | `iotdb-thrift-cluster` [...]
\ No newline at end of file
diff --git a/docs/UserGuide/API/Programming-Java-Native-API.md b/docs/UserGuide/API/Programming-Java-Native-API.md
index a53eb8bb39..87c65a2daf 100644
--- a/docs/UserGuide/API/Programming-Java-Native-API.md
+++ b/docs/UserGuide/API/Programming-Java-Native-API.md
@@ -86,7 +86,7 @@ session =
.password(String password)
.thriftDefaultBufferSize(int thriftDefaultBufferSize)
.thriftMaxFrameSize(int thriftMaxFrameSize)
- .enableCacheLeader(boolean enableCacheLeader)
+ .enableRedirection(boolean enableRedirection)
.version(Version version)
.build();
```
diff --git a/docs/zh/UserGuide/API/Interface-Comparison.md b/docs/zh/UserGuide/API/Interface-Comparison.md
index 89ae996304..42cf2c6d61 100644
--- a/docs/zh/UserGuide/API/Interface-Comparison.md
+++ b/docs/zh/UserGuide/API/Interface-Comparison.md
@@ -25,26 +25,26 @@
-| 序号 | 接口名称以及作用 | Java接口函数 | Python接口函数 | <div style="width: 200pt">接口对比说明</div> |
-| ---- | ------------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ | ------------------------------------------------------------ |
-| 1 | 初始化Session | `Session.Builder.build(); Session.Builder().host(String host).port(int port).build(); Session.Builder().nodeUrls(List<String> nodeUrls).build(); Session.Builder().fetchSize(int fetchSize).username(String username).password(String password).thriftDefaultBufferSize(int thriftDefaultBufferSize).thriftMaxFrameSize(int thriftMaxFrameSize).enableCacheLeader(boolean enableCacheLeader).version(Version version).build();` | `Session(ip, port_, user [...]
-| 2 | 开启 Session | `void open() void open(boolean enableRPCCompression)` | `session.open(enable_rpc_compression=False)` | |
-| 3 | 关闭 Session | `void close()` | `session.close()` | |
-| 4 | 设置存储组 | `void setStorageGroup(String storageGroupId)` | `session.set_storage_group(group_name)` | |
-| 5 | 删除存储组 | `void deleteStorageGroup(String storageGroup) void deleteStorageGroups(List<String> storageGroups)` | `session.delete_storage_group(group_name) session.delete_storage_groups(group_name_lst)` | |
-| 6 | 创建时间序列 | `void createTimeseries(String path, TSDataType dataType,TSEncoding encoding, CompressionType compressor, Map<String, String> props,Map<String, String> tags, Map<String, String> attributes, String measurementAlias) void createMultiTimeseries(List<String> paths, List<TSDataType> dataTypes,List<TSEncoding> encodings, List<CompressionType> compressors,List<Map<String, String>> propsList, List<Map<String, String>> tagsList,List<Map<String, Str [...]
-| 7 | 创建对齐时间序列 | `void createAlignedTimeseries(String prefixPath, List<String> measurements,List<TSDataType> dataTypes, List<TSEncoding> encodings,CompressionType compressor, List<String> measurementAliasList);` | `session.create_aligned_time_series(device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst)` | |
-| 8 | 删除时间序列 | `void deleteTimeseries(String path) void deleteTimeseries(List<String> paths)` | `session.delete_time_series(paths_list)` | Python原生接口缺少删除一个时间序列的接口 |
-| 9 | 检测时间序列是否存在 | `boolean checkTimeseriesExists(String path)` | `session.check_time_series_exists(path)` | |
-| 10 | 元数据模版 | `public void createSchemaTemplate(Template template);` | | |
-| 11 | 插入Tablet | `void insertTablet(Tablet tablet) void insertTablets(Map<String, Tablet> tablets)` | `session.insert_tablet(tablet_) session.insert_tablets(tablet_lst)` | |
-| 12 | 插入Record | `void insertRecord(String prefixPath, long time, List<String> measurements,List<TSDataType> types, List<Object> values) void insertRecords(List<String> deviceIds,List<Long> times,List<List<String>> measurementsList,List<List<TSDataType>> typesList,List<List<Object>> valuesList) void insertRecordsOfOneDevice(String deviceId, List<Long> times,List<List<Object>> valuesList)` | `session.insert_record(device_id, timestamp, measurements_, data_t [...]
-| 13 | 带有类型推断的写入 | `void insertRecord(String prefixPath, long time, List<String> measurements, List<String> values) void insertRecords(List<String> deviceIds, List<Long> times,List<List<String>> measurementsList, List<List<String>> valuesList) void insertStringRecordsOfOneDevice(String deviceId, List<Long> times,List<List<String>> measurementsList, List<List<String>> valuesList)` | `session.insert_str_record(device_id, timestamp, measurements, string_values)` | 1. [...]
-| 14 | 对齐时间序列的写入 | `insertAlignedRecord insertAlignedRecords insertAlignedRecordsOfOneDevice insertAlignedStringRecordsOfOneDevice insertAlignedTablet insertAlignedTablets` | `insert_aligned_record insert_aligned_records insert_aligned_records_of_one_device insert_aligned_tablet insert_aligned_tablets` | Python原生接口缺少带有判断类型的对齐时间序列的写入 |
-| 15 | 数据删除 | `void deleteData(String path, long endTime) void deleteData(List<String> paths, long endTime)` | | 1.Python原生接口缺少删除一条数据的接口 2.Python原生接口缺少删除多条数据的接口 |
-| 16 | 数据查询 | `SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime) SessionDataSet executeLastDataQuery(List<String> paths, long LastTime)` | | 1.Python原生接口缺少原始数据查询的接口 2.Python原生接口缺少查询最后一条时间戳大于等于某个时间点的数据的接口 |
-| 17 | IoTDB-SQL 接口-查询语句 | `SessionDataSet executeQueryStatement(String sql)` | `session.execute_query_statement(sql)` | |
-| 18 | IoTDB-SQL 接口-非查询语句 | `void executeNonQueryStatement(String sql)` | `session.execute_non_query_statement(sql)` | |
+| 序号 | 接口名称以及作用 | Java接口函数 [...]
+| ---- | ------------------------- |------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ [...]
+| 1 | 初始化Session | `Session.Builder.build(); Session.Builder().host(String host).port(int port).build(); Session.Builder().nodeUrls(List<String> nodeUrls).build(); Session.Builder().fetchSize(int fetchSize).username(String username).password(String password).thriftDefaultBufferSize(int thriftDefaultBufferSize).thriftMaxFrameSize(int thriftMaxFrameSize).enableRedirection(boolean enableCacheLeader).version(Version version).build();` [...]
+| 2 | 开启 Session | `void open() void open(boolean enableRPCCompression)` [...]
+| 3 | 关闭 Session | `void close()` [...]
+| 4 | 设置存储组 | `void setStorageGroup(String storageGroupId)` [...]
+| 5 | 删除存储组 | `void deleteStorageGroup(String storageGroup) void deleteStorageGroups(List<String> storageGroups)` [...]
+| 6 | 创建时间序列 | `void createTimeseries(String path, TSDataType dataType,TSEncoding encoding, CompressionType compressor, Map<String, String> props,Map<String, String> tags, Map<String, String> attributes, String measurementAlias) void createMultiTimeseries(List<String> paths, List<TSDataType> dataTypes,List<TSEncoding> encodings, List<CompressionType> compressors,List<Map<String, String>> propsList, List<Map<String, String>> tagsList,List<Map<String, Str [...]
+| 7 | 创建对齐时间序列 | `void createAlignedTimeseries(String prefixPath, List<String> measurements,List<TSDataType> dataTypes, List<TSEncoding> encodings,CompressionType compressor, List<String> measurementAliasList);` [...]
+| 8 | 删除时间序列 | `void deleteTimeseries(String path) void deleteTimeseries(List<String> paths)` [...]
+| 9 | 检测时间序列是否存在 | `boolean checkTimeseriesExists(String path)` [...]
+| 10 | 元数据模版 | `public void createSchemaTemplate(Template template);` [...]
+| 11 | 插入Tablet | `void insertTablet(Tablet tablet) void insertTablets(Map<String, Tablet> tablets)` [...]
+| 12 | 插入Record | `void insertRecord(String prefixPath, long time, List<String> measurements,List<TSDataType> types, List<Object> values) void insertRecords(List<String> deviceIds,List<Long> times,List<List<String>> measurementsList,List<List<TSDataType>> typesList,List<List<Object>> valuesList) void insertRecordsOfOneDevice(String deviceId, List<Long> times,List<List<Object>> valuesList)` [...]
+| 13 | 带有类型推断的写入 | `void insertRecord(String prefixPath, long time, List<String> measurements, List<String> values) void insertRecords(List<String> deviceIds, List<Long> times,List<List<String>> measurementsList, List<List<String>> valuesList) void insertStringRecordsOfOneDevice(String deviceId, List<Long> times,List<List<String>> measurementsList, List<List<String>> valuesList)` [...]
+| 14 | 对齐时间序列的写入 | `insertAlignedRecord insertAlignedRecords insertAlignedRecordsOfOneDevice insertAlignedStringRecordsOfOneDevice insertAlignedTablet insertAlignedTablets` [...]
+| 15 | 数据删除 | `void deleteData(String path, long endTime) void deleteData(List<String> paths, long endTime)` [...]
+| 16 | 数据查询 | `SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime) SessionDataSet executeLastDataQuery(List<String> paths, long LastTime)` [...]
+| 17 | IoTDB-SQL 接口-查询语句 | `SessionDataSet executeQueryStatement(String sql)` [...]
+| 18 | IoTDB-SQL 接口-非查询语句 | `void executeNonQueryStatement(String sql)` [...]
| 19 | 测试接口 | `void testInsertRecord(String deviceId, long time, List<String> measurements, List<String> values) void testInsertRecord(String deviceId, long time, List<String> measurements,List<TSDataType> types, List<Object> values) void testInsertRecords(List<String> deviceIds, List<Long> times,List<List<String>> measurementsList, List<List<String>> valuesList) void testInsertRecords(List<String> deviceIds, List<Long> times,List<List<String>> measur [...]
-| 20 | 针对原生接口的连接池 | `SessionPool` | | Python接口无针对原生接口的连接池 |
-| 21 | 集群信息相关的接口 | `iotdb-thrift-cluster` | | Python接口不支持集群信息相关的接口 |
\ No newline at end of file
+| 20 | 针对原生接口的连接池 | `SessionPool` [...]
+| 21 | 集群信息相关的接口 | `iotdb-thrift-cluster` [...]
\ No newline at end of file
diff --git a/docs/zh/UserGuide/API/Programming-Java-Native-API.md b/docs/zh/UserGuide/API/Programming-Java-Native-API.md
index 4c0a82851e..ab3ebaa378 100644
--- a/docs/zh/UserGuide/API/Programming-Java-Native-API.md
+++ b/docs/zh/UserGuide/API/Programming-Java-Native-API.md
@@ -89,7 +89,7 @@ session =
.password(String password)
.thriftDefaultBufferSize(int thriftDefaultBufferSize)
.thriftMaxFrameSize(int thriftMaxFrameSize)
- .enableCacheLeader(boolean enableCacheLeader)
+ .enableRedirection(boolean enableRedirection)
.version(Version version)
.build();
```
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 2f97f3b37c..4ac62251df 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -84,7 +84,7 @@ public interface BaseEnv {
null,
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE,
- SessionConfig.DEFAULT_CACHE_LEADER_MODE,
+ SessionConfig.DEFAULT_REDIRECTION_MODE,
SessionConfig.DEFAULT_VERSION);
}
@@ -97,7 +97,7 @@ public interface BaseEnv {
ZoneId zoneId,
int thriftDefaultBufferSize,
int thriftMaxFrameSize,
- boolean enableCacheLeader,
+ boolean enableRedirection,
Version version)
throws IoTDBConnectionException {
Session session =
@@ -110,7 +110,7 @@ public interface BaseEnv {
zoneId,
thriftDefaultBufferSize,
thriftMaxFrameSize,
- enableCacheLeader,
+ enableRedirection,
version);
session.open();
@@ -127,7 +127,7 @@ public interface BaseEnv {
null,
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE,
- SessionConfig.DEFAULT_CACHE_LEADER_MODE,
+ SessionConfig.DEFAULT_REDIRECTION_MODE,
SessionConfig.DEFAULT_VERSION);
session.open();
return session;
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
index c11a6cb937..7e46ef360a 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
@@ -158,7 +158,7 @@ public class StandaloneEnv implements BaseEnv {
ZoneId zoneId,
int thriftDefaultBufferSize,
int thriftMaxFrameSize,
- boolean enableCacheLeader,
+ boolean enableRedirection,
Version version)
throws IoTDBConnectionException {
Session session =
@@ -171,7 +171,7 @@ public class StandaloneEnv implements BaseEnv {
zoneId,
thriftDefaultBufferSize,
thriftMaxFrameSize,
- enableCacheLeader,
+ enableRedirection,
version);
session.open();
diff --git a/integration/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/integration/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
index ff9eef2e43..48888df242 100644
--- a/integration/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
+++ b/integration/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -461,7 +461,7 @@ public class SessionPoolTest {
.password("123")
.fetchSize(1)
.waitToGetSessionTimeoutInMs(2)
- .enableCacheLeader(true)
+ .enableRedirection(true)
.enableCompression(true)
.zoneId(ZoneOffset.UTC)
.connectionTimeoutInMs(3)
@@ -474,7 +474,7 @@ public class SessionPoolTest {
assertEquals(10, pool.getMaxSize());
assertEquals(1, pool.getFetchSize());
assertEquals(2, pool.getWaitToGetSessionTimeoutInMs());
- assertTrue(pool.isEnableCacheLeader());
+ assertTrue(pool.isEnableRedirection());
assertTrue(pool.isEnableCompression());
assertEquals(3, pool.getConnectionTimeoutInMs());
assertEquals(ZoneOffset.UTC, pool.getZoneId());
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 70a8436162..0b3761bc69 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -22,6 +22,7 @@
####################
# could set 0.0.0.0, 127.0.0.1(for local test) or ipv4 address
+# if enable redirection in session, rpc_address should be the ip which session can connect.
# Datatype: String
rpc_address=0.0.0.0
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index da94af0a18..ea9e8284bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -141,10 +141,6 @@ public class DataNode implements DataNodeMBean {
}
}
- // if client ip is the default address, set it same with internal ip
- if (config.getRpcAddress().equals("0.0.0.0")) {
- config.setRpcAddress(config.getInternalAddress());
- }
thisNode.setIp(config.getInternalAddress());
thisNode.setPort(config.getInternalPort());
}
diff --git a/session/src/main/java/org/apache/iotdb/session/ISession.java b/session/src/main/java/org/apache/iotdb/session/ISession.java
index 40e7031db8..427ced3466 100644
--- a/session/src/main/java/org/apache/iotdb/session/ISession.java
+++ b/session/src/main/java/org/apache/iotdb/session/ISession.java
@@ -430,9 +430,9 @@ public interface ISession extends AutoCloseable {
void setEnableQueryRedirection(boolean enableQueryRedirection);
- boolean isEnableCacheLeader();
+ boolean isEnableRedirection();
- void setEnableCacheLeader(boolean enableCacheLeader);
+ void setEnableRedirection(boolean enableRedirection);
void sortTablet(Tablet tablet);
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 4ed67f07fb..5252acbc56 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -127,7 +127,7 @@ public class Session implements ISession {
private boolean isClosed = true;
// Cluster version cache
- protected boolean enableCacheLeader;
+ protected boolean enableRedirection;
protected volatile Map<String, TEndPoint> deviceIdToEndpoint;
protected volatile Map<TEndPoint, SessionConnection> endPointToSessionConnection;
@@ -146,7 +146,7 @@ public class Session implements ISession {
null,
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE,
- SessionConfig.DEFAULT_CACHE_LEADER_MODE,
+ SessionConfig.DEFAULT_REDIRECTION_MODE,
SessionConfig.DEFAULT_VERSION);
}
@@ -160,7 +160,7 @@ public class Session implements ISession {
null,
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE,
- SessionConfig.DEFAULT_CACHE_LEADER_MODE,
+ SessionConfig.DEFAULT_REDIRECTION_MODE,
SessionConfig.DEFAULT_VERSION);
}
@@ -174,7 +174,7 @@ public class Session implements ISession {
null,
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE,
- SessionConfig.DEFAULT_CACHE_LEADER_MODE,
+ SessionConfig.DEFAULT_REDIRECTION_MODE,
SessionConfig.DEFAULT_VERSION);
}
@@ -188,7 +188,7 @@ public class Session implements ISession {
null,
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE,
- SessionConfig.DEFAULT_CACHE_LEADER_MODE,
+ SessionConfig.DEFAULT_REDIRECTION_MODE,
SessionConfig.DEFAULT_VERSION);
}
@@ -208,7 +208,7 @@ public class Session implements ISession {
null,
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE,
- SessionConfig.DEFAULT_CACHE_LEADER_MODE,
+ SessionConfig.DEFAULT_REDIRECTION_MODE,
SessionConfig.DEFAULT_VERSION);
this.queryTimeoutInMs = queryTimeoutInMs;
}
@@ -223,12 +223,12 @@ public class Session implements ISession {
zoneId,
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE,
- SessionConfig.DEFAULT_CACHE_LEADER_MODE,
+ SessionConfig.DEFAULT_REDIRECTION_MODE,
SessionConfig.DEFAULT_VERSION);
}
public Session(
- String host, int rpcPort, String username, String password, boolean enableCacheLeader) {
+ String host, int rpcPort, String username, String password, boolean enableRedirection) {
this(
host,
rpcPort,
@@ -238,7 +238,7 @@ public class Session implements ISession {
null,
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE,
- enableCacheLeader,
+ enableRedirection,
SessionConfig.DEFAULT_VERSION);
}
@@ -249,7 +249,7 @@ public class Session implements ISession {
String password,
int fetchSize,
ZoneId zoneId,
- boolean enableCacheLeader) {
+ boolean enableRedirection) {
this(
host,
rpcPort,
@@ -259,7 +259,7 @@ public class Session implements ISession {
zoneId,
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE,
- enableCacheLeader,
+ enableRedirection,
SessionConfig.DEFAULT_VERSION);
}
@@ -273,7 +273,7 @@ public class Session implements ISession {
ZoneId zoneId,
int thriftDefaultBufferSize,
int thriftMaxFrameSize,
- boolean enableCacheLeader,
+ boolean enableRedirection,
Version version) {
this.defaultEndPoint = new TEndPoint(host, rpcPort);
this.username = username;
@@ -282,7 +282,7 @@ public class Session implements ISession {
this.zoneId = zoneId;
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
this.thriftMaxFrameSize = thriftMaxFrameSize;
- this.enableCacheLeader = enableCacheLeader;
+ this.enableRedirection = enableRedirection;
this.version = version;
}
@@ -295,7 +295,7 @@ public class Session implements ISession {
null,
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE,
- SessionConfig.DEFAULT_CACHE_LEADER_MODE,
+ SessionConfig.DEFAULT_REDIRECTION_MODE,
SessionConfig.DEFAULT_VERSION);
}
@@ -313,7 +313,7 @@ public class Session implements ISession {
null,
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE,
- SessionConfig.DEFAULT_CACHE_LEADER_MODE,
+ SessionConfig.DEFAULT_REDIRECTION_MODE,
SessionConfig.DEFAULT_VERSION);
}
@@ -326,7 +326,7 @@ public class Session implements ISession {
zoneId,
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE,
- SessionConfig.DEFAULT_CACHE_LEADER_MODE,
+ SessionConfig.DEFAULT_REDIRECTION_MODE,
SessionConfig.DEFAULT_VERSION);
}
@@ -338,7 +338,7 @@ public class Session implements ISession {
ZoneId zoneId,
int thriftDefaultBufferSize,
int thriftMaxFrameSize,
- boolean enableCacheLeader,
+ boolean enableRedirection,
Version version) {
this.nodeUrls = nodeUrls;
this.username = username;
@@ -347,7 +347,7 @@ public class Session implements ISession {
this.zoneId = zoneId;
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
this.thriftMaxFrameSize = thriftMaxFrameSize;
- this.enableCacheLeader = enableCacheLeader;
+ this.enableRedirection = enableRedirection;
this.version = version;
}
@@ -393,7 +393,7 @@ public class Session implements ISession {
defaultSessionConnection = constructSessionConnection(this, defaultEndPoint, zoneId);
defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
isClosed = false;
- if (enableCacheLeader || enableQueryRedirection) {
+ if (enableRedirection || enableQueryRedirection) {
deviceIdToEndpoint = new ConcurrentHashMap<>();
endPointToSessionConnection = new ConcurrentHashMap<>();
endPointToSessionConnection.put(defaultEndPoint, defaultSessionConnection);
@@ -406,7 +406,7 @@ public class Session implements ISession {
return;
}
try {
- if (enableCacheLeader) {
+ if (enableRedirection) {
for (SessionConnection sessionConnection : endPointToSessionConnection.values()) {
sessionConnection.close();
}
@@ -831,6 +831,21 @@ public class Session implements ISession {
getSessionConnection(prefixPath).insertRecord(request);
} catch (RedirectException e) {
handleRedirection(prefixPath, e.getEndPoint());
+ } catch (IoTDBConnectionException e) {
+ if (enableRedirection
+ && !deviceIdToEndpoint.isEmpty()
+ && deviceIdToEndpoint.get(prefixPath) != null) {
+ logger.warn("Session can not connect to {}", deviceIdToEndpoint.get(prefixPath));
+ deviceIdToEndpoint.remove(prefixPath);
+
+ // reconnect with default connection
+ try {
+ defaultSessionConnection.insertRecord(request);
+ } catch (RedirectException ignored) {
+ }
+ } else {
+ throw e;
+ }
}
}
@@ -840,12 +855,27 @@ public class Session implements ISession {
getSessionConnection(deviceId).insertRecord(request);
} catch (RedirectException e) {
handleRedirection(deviceId, e.getEndPoint());
+ } catch (IoTDBConnectionException e) {
+ if (enableRedirection
+ && !deviceIdToEndpoint.isEmpty()
+ && deviceIdToEndpoint.get(deviceId) != null) {
+ logger.warn("Session can not connect to {}", deviceIdToEndpoint.get(deviceId));
+ deviceIdToEndpoint.remove(deviceId);
+
+ // reconnect with default connection
+ try {
+ defaultSessionConnection.insertRecord(request);
+ } catch (RedirectException ignored) {
+ }
+ } else {
+ throw e;
+ }
}
}
private SessionConnection getSessionConnection(String deviceId) {
TEndPoint endPoint;
- if (enableCacheLeader
+ if (enableRedirection
&& !deviceIdToEndpoint.isEmpty()
&& (endPoint = deviceIdToEndpoint.get(deviceId)) != null) {
return endPointToSessionConnection.get(endPoint);
@@ -862,7 +892,7 @@ public class Session implements ISession {
// TODO https://issues.apache.org/jira/browse/IOTDB-1399
private void removeBrokenSessionConnection(SessionConnection sessionConnection) {
// remove the cached broken leader session
- if (enableCacheLeader) {
+ if (enableRedirection) {
TEndPoint endPoint = null;
for (Iterator<Entry<TEndPoint, SessionConnection>> it =
endPointToSessionConnection.entrySet().iterator();
@@ -885,9 +915,12 @@ public class Session implements ISession {
}
}
- private void handleRedirection(String deviceId, TEndPoint endpoint)
- throws IoTDBConnectionException {
- if (enableCacheLeader) {
+ private void handleRedirection(String deviceId, TEndPoint endpoint) {
+ if (enableRedirection) {
+ // no need to redirection
+ if (endpoint.ip.equals("0.0.0.0")) {
+ return;
+ }
AtomicReference<IoTDBConnectionException> exceptionReference = new AtomicReference<>();
deviceIdToEndpoint.put(deviceId, endpoint);
SessionConnection connection =
@@ -903,7 +936,7 @@ public class Session implements ISession {
});
if (connection == null) {
deviceIdToEndpoint.remove(deviceId);
- throw new IoTDBConnectionException(exceptionReference.get());
+ logger.warn("Can not redirect to {}, because session can not connect to it.", endpoint);
}
}
}
@@ -1135,7 +1168,7 @@ public class Session implements ISession {
throw new IllegalArgumentException(
"deviceIds, times, measurementsList and valuesList's size should be equal");
}
- if (enableCacheLeader) {
+ if (enableRedirection) {
insertStringRecordsWithLeaderCache(deviceIds, times, measurementsList, valuesList, false);
} else {
TSInsertStringRecordsReq request;
@@ -1153,11 +1186,7 @@ public class Session implements ISession {
}
try {
defaultSessionConnection.insertRecords(request);
- } catch (RedirectException e) {
- Map<String, TEndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
- for (Map.Entry<String, TEndPoint> deviceEndPointEntry : deviceEndPointMap.entrySet()) {
- handleRedirection(deviceEndPointEntry.getKey(), deviceEndPointEntry.getValue());
- }
+ } catch (RedirectException ignored) {
}
}
}
@@ -1388,7 +1417,7 @@ public class Session implements ISession {
throw new IllegalArgumentException(
"prefixPaths, times, subMeasurementsList and valuesList's size should be equal");
}
- if (enableCacheLeader) {
+ if (enableRedirection) {
insertStringRecordsWithLeaderCache(deviceIds, times, measurementsList, valuesList, true);
} else {
TSInsertStringRecordsReq request;
@@ -1407,11 +1436,7 @@ public class Session implements ISession {
try {
defaultSessionConnection.insertRecords(request);
- } catch (RedirectException e) {
- Map<String, TEndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
- for (Map.Entry<String, TEndPoint> deviceEndPointEntry : deviceEndPointMap.entrySet()) {
- handleRedirection(deviceEndPointEntry.getKey(), deviceEndPointEntry.getValue());
- }
+ } catch (RedirectException ignored) {
}
}
}
@@ -1568,7 +1593,7 @@ public class Session implements ISession {
throw new IllegalArgumentException(
"deviceIds, times, measurementsList and valuesList's size should be equal");
}
- if (enableCacheLeader) {
+ if (enableRedirection) {
insertRecordsWithLeaderCache(
deviceIds, times, measurementsList, typesList, valuesList, false);
} else {
@@ -1587,11 +1612,7 @@ public class Session implements ISession {
}
try {
defaultSessionConnection.insertRecords(request);
- } catch (RedirectException e) {
- Map<String, TEndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
- for (Map.Entry<String, TEndPoint> deviceEndPointEntry : deviceEndPointMap.entrySet()) {
- handleRedirection(deviceEndPointEntry.getKey(), deviceEndPointEntry.getValue());
- }
+ } catch (RedirectException ignored) {
}
}
}
@@ -1619,7 +1640,7 @@ public class Session implements ISession {
throw new IllegalArgumentException(
"prefixPaths, times, subMeasurementsList and valuesList's size should be equal");
}
- if (enableCacheLeader) {
+ if (enableRedirection) {
insertRecordsWithLeaderCache(deviceIds, times, measurementsList, typesList, valuesList, true);
} else {
TSInsertRecordsReq request;
@@ -1637,11 +1658,7 @@ public class Session implements ISession {
}
try {
defaultSessionConnection.insertRecords(request);
- } catch (RedirectException e) {
- Map<String, TEndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
- for (Map.Entry<String, TEndPoint> deviceEndPointEntry : deviceEndPointMap.entrySet()) {
- handleRedirection(deviceEndPointEntry.getKey(), deviceEndPointEntry.getValue());
- }
+ } catch (RedirectException ignored) {
}
}
}
@@ -1707,6 +1724,21 @@ public class Session implements ISession {
getSessionConnection(deviceId).insertRecordsOfOneDevice(request);
} catch (RedirectException e) {
handleRedirection(deviceId, e.getEndPoint());
+ } catch (IoTDBConnectionException e) {
+ if (enableRedirection
+ && !deviceIdToEndpoint.isEmpty()
+ && deviceIdToEndpoint.get(deviceId) != null) {
+ logger.warn("Session can not connect to {}", deviceIdToEndpoint.get(deviceId));
+ deviceIdToEndpoint.remove(deviceId);
+
+ // reconnect with default connection
+ try {
+ defaultSessionConnection.insertRecordsOfOneDevice(request);
+ } catch (RedirectException ignored) {
+ }
+ } else {
+ throw e;
+ }
}
}
@@ -1750,6 +1782,21 @@ public class Session implements ISession {
getSessionConnection(deviceId).insertStringRecordsOfOneDevice(req);
} catch (RedirectException e) {
handleRedirection(deviceId, e.getEndPoint());
+ } catch (IoTDBConnectionException e) {
+ if (enableRedirection
+ && !deviceIdToEndpoint.isEmpty()
+ && deviceIdToEndpoint.get(deviceId) != null) {
+ logger.warn("Session can not connect to {}", deviceIdToEndpoint.get(deviceId));
+ deviceIdToEndpoint.remove(deviceId);
+
+ // reconnect with default connection
+ try {
+ defaultSessionConnection.insertStringRecordsOfOneDevice(req);
+ } catch (RedirectException ignored) {
+ }
+ } else {
+ throw e;
+ }
}
}
@@ -1833,6 +1880,21 @@ public class Session implements ISession {
getSessionConnection(deviceId).insertRecordsOfOneDevice(request);
} catch (RedirectException e) {
handleRedirection(deviceId, e.getEndPoint());
+ } catch (IoTDBConnectionException e) {
+ if (enableRedirection
+ && !deviceIdToEndpoint.isEmpty()
+ && deviceIdToEndpoint.get(deviceId) != null) {
+ logger.warn("Session can not connect to {}", deviceIdToEndpoint.get(deviceId));
+ deviceIdToEndpoint.remove(deviceId);
+
+ // reconnect with default connection
+ try {
+ defaultSessionConnection.insertRecordsOfOneDevice(request);
+ } catch (RedirectException ignored) {
+ }
+ } else {
+ throw e;
+ }
}
}
@@ -1876,6 +1938,21 @@ public class Session implements ISession {
getSessionConnection(deviceId).insertStringRecordsOfOneDevice(req);
} catch (RedirectException e) {
handleRedirection(deviceId, e.getEndPoint());
+ } catch (IoTDBConnectionException e) {
+ if (enableRedirection
+ && !deviceIdToEndpoint.isEmpty()
+ && deviceIdToEndpoint.get(deviceId) != null) {
+ logger.warn("Session can not connect to {}", deviceIdToEndpoint.get(deviceId));
+ deviceIdToEndpoint.remove(deviceId);
+
+ // reconnect with default connection
+ try {
+ defaultSessionConnection.insertStringRecordsOfOneDevice(req);
+ } catch (RedirectException ignored) {
+ }
+ } else {
+ throw e;
+ }
}
}
@@ -2178,6 +2255,21 @@ public class Session implements ISession {
getSessionConnection(tablet.deviceId).insertTablet(request);
} catch (RedirectException e) {
handleRedirection(tablet.deviceId, e.getEndPoint());
+ } catch (IoTDBConnectionException e) {
+ if (enableRedirection
+ && !deviceIdToEndpoint.isEmpty()
+ && deviceIdToEndpoint.get(tablet.deviceId) != null) {
+ logger.warn("Session can not connect to {}", deviceIdToEndpoint.get(tablet.deviceId));
+ deviceIdToEndpoint.remove(tablet.deviceId);
+
+ // reconnect with default connection
+ try {
+ defaultSessionConnection.insertTablet(request);
+ } catch (RedirectException ignored) {
+ }
+ } else {
+ throw e;
+ }
}
}
@@ -2211,6 +2303,21 @@ public class Session implements ISession {
getSessionConnection(tablet.deviceId).insertTablet(request);
} catch (RedirectException e) {
handleRedirection(tablet.deviceId, e.getEndPoint());
+ } catch (IoTDBConnectionException e) {
+ if (enableRedirection
+ && !deviceIdToEndpoint.isEmpty()
+ && deviceIdToEndpoint.get(tablet.deviceId) != null) {
+ logger.warn("Session can not connect to {}", deviceIdToEndpoint.get(tablet.deviceId));
+ deviceIdToEndpoint.remove(tablet.deviceId);
+
+ // reconnect with default connection
+ try {
+ defaultSessionConnection.insertTablet(request);
+ } catch (RedirectException ignored) {
+ }
+ } else {
+ throw e;
+ }
}
}
@@ -2259,18 +2366,14 @@ public class Session implements ISession {
@Override
public void insertTablets(Map<String, Tablet> tablets, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
- if (enableCacheLeader) {
+ if (enableRedirection) {
insertTabletsWithLeaderCache(tablets, sorted, false);
} else {
TSInsertTabletsReq request =
genTSInsertTabletsReq(new ArrayList<>(tablets.values()), sorted, false);
try {
defaultSessionConnection.insertTablets(request);
- } catch (RedirectException e) {
- Map<String, TEndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
- for (Map.Entry<String, TEndPoint> deviceEndPointEntry : deviceEndPointMap.entrySet()) {
- handleRedirection(deviceEndPointEntry.getKey(), deviceEndPointEntry.getValue());
- }
+ } catch (RedirectException ignored) {
}
}
}
@@ -2299,18 +2402,14 @@ public class Session implements ISession {
@Override
public void insertAlignedTablets(Map<String, Tablet> tablets, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
- if (enableCacheLeader) {
+ if (enableRedirection) {
insertTabletsWithLeaderCache(tablets, sorted, true);
} else {
TSInsertTabletsReq request =
genTSInsertTabletsReq(new ArrayList<>(tablets.values()), sorted, true);
try {
defaultSessionConnection.insertTablets(request);
- } catch (RedirectException e) {
- Map<String, TEndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
- for (Map.Entry<String, TEndPoint> deviceEndPointEntry : deviceEndPointMap.entrySet()) {
- handleRedirection(deviceEndPointEntry.getKey(), deviceEndPointEntry.getValue());
- }
+ } catch (RedirectException ignored) {
}
}
}
@@ -3094,21 +3193,18 @@ public class Session implements ISession {
try {
insertConsumer.insert(connection, recordsReq);
} catch (RedirectException e) {
- e.getDeviceEndPointMap()
- .forEach(
- (deviceId, endpoint) -> {
- try {
- handleRedirection(deviceId, endpoint);
- } catch (IoTDBConnectionException ioTDBConnectionException) {
- throw new CompletionException(ioTDBConnectionException);
- }
- });
+ e.getDeviceEndPointMap().forEach(this::handleRedirection);
} catch (StatementExecutionException e) {
throw new CompletionException(e);
} catch (IoTDBConnectionException e) {
// remove the broken session
removeBrokenSessionConnection(connection);
- throw new CompletionException(e);
+ try {
+ insertConsumer.insert(defaultSessionConnection, recordsReq);
+ } catch (IoTDBConnectionException | StatementExecutionException ex) {
+ throw new CompletionException(ex);
+ } catch (RedirectException ignored) {
+ }
}
},
OPERATION_EXECUTOR);
@@ -3144,13 +3240,13 @@ public class Session implements ISession {
}
@Override
- public boolean isEnableCacheLeader() {
- return enableCacheLeader;
+ public boolean isEnableRedirection() {
+ return enableRedirection;
}
@Override
- public void setEnableCacheLeader(boolean enableCacheLeader) {
- this.enableCacheLeader = enableCacheLeader;
+ public void setEnableRedirection(boolean enableRedirection) {
+ this.enableRedirection = enableRedirection;
}
public static class Builder {
@@ -3162,7 +3258,7 @@ public class Session implements ISession {
private ZoneId zoneId = null;
private int thriftDefaultBufferSize = SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY;
private int thriftMaxFrameSize = SessionConfig.DEFAULT_MAX_FRAME_SIZE;
- private boolean enableCacheLeader = SessionConfig.DEFAULT_CACHE_LEADER_MODE;
+ private boolean enableRedirection = SessionConfig.DEFAULT_REDIRECTION_MODE;
private Version version = SessionConfig.DEFAULT_VERSION;
private long timeOut = SessionConfig.DEFAULT_QUERY_TIME_OUT;
@@ -3208,8 +3304,8 @@ public class Session implements ISession {
return this;
}
- public Builder enableCacheLeader(boolean enableCacheLeader) {
- this.enableCacheLeader = enableCacheLeader;
+ public Builder enableRedirection(boolean enableRedirection) {
+ this.enableRedirection = enableRedirection;
return this;
}
@@ -3245,7 +3341,7 @@ public class Session implements ISession {
zoneId,
thriftDefaultBufferSize,
thriftMaxFrameSize,
- enableCacheLeader,
+ enableRedirection,
version);
newSession.setEnableQueryRedirection(true);
return newSession;
@@ -3260,7 +3356,7 @@ public class Session implements ISession {
zoneId,
thriftDefaultBufferSize,
thriftMaxFrameSize,
- enableCacheLeader,
+ enableRedirection,
version);
}
}
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConfig.java b/session/src/main/java/org/apache/iotdb/session/SessionConfig.java
index f730b9715b..581d28d1c3 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConfig.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConfig.java
@@ -28,7 +28,7 @@ public class SessionConfig {
public static final String DEFAULT_PASSWORD = "root";
public static final int DEFAULT_FETCH_SIZE = 5000;
public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 0;
- public static final boolean DEFAULT_CACHE_LEADER_MODE = true;
+ public static final boolean DEFAULT_REDIRECTION_MODE = true;
public static final int CPU_CORES = Runtime.getRuntime().availableProcessors();
public static final int DEFAULT_SESSION_EXECUTOR_THREAD_NUM = 2 * CPU_CORES;
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index b589717bc6..60e94f76df 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -84,7 +84,7 @@ public class SessionPool {
private final String password;
private final int fetchSize;
private final ZoneId zoneId;
- private final boolean enableCacheLeader;
+ private final boolean enableRedirection;
// parameters for Session#open()
private final int connectionTimeoutInMs;
@@ -107,7 +107,7 @@ public class SessionPool {
60_000,
false,
null,
- SessionConfig.DEFAULT_CACHE_LEADER_MODE,
+ SessionConfig.DEFAULT_REDIRECTION_MODE,
SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
}
@@ -121,7 +121,7 @@ public class SessionPool {
60_000,
false,
null,
- SessionConfig.DEFAULT_CACHE_LEADER_MODE,
+ SessionConfig.DEFAULT_REDIRECTION_MODE,
SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
}
@@ -137,7 +137,7 @@ public class SessionPool {
60_000,
enableCompression,
null,
- SessionConfig.DEFAULT_CACHE_LEADER_MODE,
+ SessionConfig.DEFAULT_REDIRECTION_MODE,
SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
}
@@ -152,7 +152,7 @@ public class SessionPool {
60_000,
enableCompression,
null,
- SessionConfig.DEFAULT_CACHE_LEADER_MODE,
+ SessionConfig.DEFAULT_REDIRECTION_MODE,
SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
}
@@ -163,7 +163,7 @@ public class SessionPool {
String password,
int maxSize,
boolean enableCompression,
- boolean enableCacheLeader) {
+ boolean enableRedirection) {
this(
host,
port,
@@ -174,7 +174,7 @@ public class SessionPool {
60_000,
enableCompression,
null,
- enableCacheLeader,
+ enableRedirection,
SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
}
@@ -184,7 +184,7 @@ public class SessionPool {
String password,
int maxSize,
boolean enableCompression,
- boolean enableCacheLeader) {
+ boolean enableRedirection) {
this(
nodeUrls,
user,
@@ -194,7 +194,7 @@ public class SessionPool {
60_000,
enableCompression,
null,
- enableCacheLeader,
+ enableRedirection,
SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
}
@@ -210,7 +210,7 @@ public class SessionPool {
60_000,
false,
zoneId,
- SessionConfig.DEFAULT_CACHE_LEADER_MODE,
+ SessionConfig.DEFAULT_REDIRECTION_MODE,
SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
}
@@ -225,7 +225,7 @@ public class SessionPool {
60_000,
false,
zoneId,
- SessionConfig.DEFAULT_CACHE_LEADER_MODE,
+ SessionConfig.DEFAULT_REDIRECTION_MODE,
SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
}
@@ -240,7 +240,7 @@ public class SessionPool {
long waitToGetSessionTimeoutInMs,
boolean enableCompression,
ZoneId zoneId,
- boolean enableCacheLeader,
+ boolean enableRedirection,
int connectionTimeoutInMs) {
this.maxSize = maxSize;
this.host = host;
@@ -252,7 +252,7 @@ public class SessionPool {
this.waitToGetSessionTimeoutInMs = waitToGetSessionTimeoutInMs;
this.enableCompression = enableCompression;
this.zoneId = zoneId;
- this.enableCacheLeader = enableCacheLeader;
+ this.enableRedirection = enableRedirection;
this.connectionTimeoutInMs = connectionTimeoutInMs;
}
@@ -265,7 +265,7 @@ public class SessionPool {
long waitToGetSessionTimeoutInMs,
boolean enableCompression,
ZoneId zoneId,
- boolean enableCacheLeader,
+ boolean enableRedirection,
int connectionTimeoutInMs) {
this.maxSize = maxSize;
this.host = null;
@@ -277,7 +277,7 @@ public class SessionPool {
this.waitToGetSessionTimeoutInMs = waitToGetSessionTimeoutInMs;
this.enableCompression = enableCompression;
this.zoneId = zoneId;
- this.enableCacheLeader = enableCacheLeader;
+ this.enableRedirection = enableRedirection;
this.connectionTimeoutInMs = connectionTimeoutInMs;
}
@@ -293,7 +293,7 @@ public class SessionPool {
.password(password)
.fetchSize(fetchSize)
.zoneId(zoneId)
- .enableCacheLeader(enableCacheLeader)
+ .enableRedirection(enableRedirection)
.build();
} else {
// Construct redirect-able Session
@@ -304,7 +304,7 @@ public class SessionPool {
.password(password)
.fetchSize(fetchSize)
.zoneId(zoneId)
- .enableCacheLeader(enableCacheLeader)
+ .enableRedirection(enableRedirection)
.build();
}
return session;
@@ -2297,8 +2297,8 @@ public class SessionPool {
return enableCompression;
}
- public boolean isEnableCacheLeader() {
- return enableCacheLeader;
+ public boolean isEnableRedirection() {
+ return enableRedirection;
}
public int getConnectionTimeoutInMs() {
@@ -2317,7 +2317,7 @@ public class SessionPool {
private long waitToGetSessionTimeoutInMs = 60_000;
private boolean enableCompression = false;
private ZoneId zoneId = null;
- private boolean enableCacheLeader = SessionConfig.DEFAULT_CACHE_LEADER_MODE;
+ private boolean enableRedirection = SessionConfig.DEFAULT_REDIRECTION_MODE;
private int connectionTimeoutInMs = SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS;
public Builder host(String host) {
@@ -2370,8 +2370,8 @@ public class SessionPool {
return this;
}
- public Builder enableCacheLeader(boolean enableCacheLeader) {
- this.enableCacheLeader = enableCacheLeader;
+ public Builder enableRedirection(boolean enableRedirection) {
+ this.enableRedirection = enableRedirection;
return this;
}
@@ -2392,7 +2392,7 @@ public class SessionPool {
waitToGetSessionTimeoutInMs,
enableCompression,
zoneId,
- enableCacheLeader,
+ enableRedirection,
connectionTimeoutInMs);
} else {
return new SessionPool(
@@ -2404,7 +2404,7 @@ public class SessionPool {
waitToGetSessionTimeoutInMs,
enableCompression,
zoneId,
- enableCacheLeader,
+ enableRedirection,
connectionTimeoutInMs);
}
}
diff --git a/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java b/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
index ad5779cb41..48ba3b3dc6 100644
--- a/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
+++ b/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
@@ -960,7 +960,7 @@ public class SessionCacheLeaderUT {
private MockSessionConnection lastConstructedSessionConnection;
- public MockSession(String host, int rpcPort, boolean enableCacheLeader) {
+ public MockSession(String host, int rpcPort, boolean enableRedirection) {
super(
host,
rpcPort,
@@ -970,7 +970,7 @@ public class SessionCacheLeaderUT {
null,
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY,
SessionConfig.DEFAULT_MAX_FRAME_SIZE,
- enableCacheLeader,
+ enableRedirection,
SessionConfig.DEFAULT_VERSION);
}
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterIT.java b/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterIT.java
index f14dea9be8..71d8882608 100644
--- a/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterIT.java
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterIT.java
@@ -89,7 +89,7 @@ public abstract class ClusterIT extends Cases {
.port(getWriteRpcPort())
.username("root")
.password("root")
- .enableCacheLeader(false)
+ .enableRedirection(false)
.build();
session.open();
TimeUnit.MILLISECONDS.sleep(3000);