You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/12 05:13:02 UTC
[iotdb] branch rel/0.13 updated: [IOTDB-2754] SessionPool auto redirect IoTDB instance (#5267) (#5313)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 5a30d37871 [IOTDB-2754] SessionPool auto redirect IoTDB instance (#5267) (#5313)
5a30d37871 is described below
commit 5a30d37871635c285173c82dbb27d069d5e8764e
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Tue Apr 12 13:12:56 2022 +0800
[IOTDB-2754] SessionPool auto redirect IoTDB instance (#5267) (#5313)
---
docs/UserGuide/API/Programming-Java-Native-API.md | 1 +
.../UserGuide/API/Programming-Java-Native-API.md | 3 +-
.../java/org/apache/iotdb/SessionPoolExample.java | 42 +++--
.../java/org/apache/iotdb/session/Session.java | 67 +++++---
.../org/apache/iotdb/session/pool/SessionPool.java | 178 +++++++++++++++++++--
5 files changed, 241 insertions(+), 50 deletions(-)
diff --git a/docs/UserGuide/API/Programming-Java-Native-API.md b/docs/UserGuide/API/Programming-Java-Native-API.md
index 90b4c1e4a0..260ef3c76a 100644
--- a/docs/UserGuide/API/Programming-Java-Native-API.md
+++ b/docs/UserGuide/API/Programming-Java-Native-API.md
@@ -514,6 +514,7 @@ If you can not get a session connection in 60 seconds, there is a warning log bu
If a session has finished an operation, it will be put back to the pool automatically.
If a session connection is broken, the session will be removed automatically and the pool will try
to create a new session and redo the operation.
+You can also specify an url list of multiple reachable nodes when creating a SessionPool, just as you would when creating a Session. To ensure high availability of clients in distributed cluster.
For query operations:
diff --git a/docs/zh/UserGuide/API/Programming-Java-Native-API.md b/docs/zh/UserGuide/API/Programming-Java-Native-API.md
index 5edd930d1f..1f7e378504 100644
--- a/docs/zh/UserGuide/API/Programming-Java-Native-API.md
+++ b/docs/zh/UserGuide/API/Programming-Java-Native-API.md
@@ -498,7 +498,8 @@ void testInsertTablets(Map<String, Tablet> tablets)
如果超过 60s 都没得到一个连接的话,那么会打印一条警告日志,但是程序仍将继续等待。
当一个连接被用完后,他会自动返回池中等待下次被使用;
-当一个连接损坏后,他会从池中被删除,并重建一个连接重新执行用户的操作。
+当一个连接损坏后,他会从池中被删除,并重建一个连接重新执行用户的操作;
+你还可以像创建 Session 那样在创建 SessionPool 时指定多个可连接节点的 url,以保证分布式集群中客户端的高可用性。
对于查询操作:
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
index 230849d25d..23a1895c22 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
@@ -32,12 +32,12 @@ import java.util.concurrent.Executors;
public class SessionPoolExample {
- private static SessionPool pool;
+ private static SessionPool sessionPool;
private static ExecutorService service;
- public static void main(String[] args)
- throws StatementExecutionException, IoTDBConnectionException, InterruptedException {
- pool =
+ /** Build a custom SessionPool for this example */
+ private static void constructCustomSessionPool() {
+ sessionPool =
new SessionPool.Builder()
.host("127.0.0.1")
.port(6667)
@@ -45,13 +45,33 @@ public class SessionPoolExample {
.password("root")
.maxSize(3)
.build();
- service = Executors.newFixedThreadPool(10);
+ }
+ /** Build a redirect-able SessionPool for this example */
+ private static void constructRedirectSessionPool() {
+ List<String> nodeUrls = new ArrayList<>();
+ nodeUrls.add("127.0.0.1:6667");
+ nodeUrls.add("127.0.0.1:6668");
+ sessionPool =
+ new SessionPool.Builder()
+ .nodeUrls(nodeUrls)
+ .user("root")
+ .password("root")
+ .maxSize(3)
+ .build();
+ }
+
+ public static void main(String[] args)
+ throws StatementExecutionException, IoTDBConnectionException, InterruptedException {
+ // Choose the SessionPool you going to use
+ constructRedirectSessionPool();
+
+ service = Executors.newFixedThreadPool(10);
insertRecord();
queryByRowRecord();
Thread.sleep(1000);
queryByIterator();
- pool.close();
+ sessionPool.close();
service.shutdown();
}
@@ -72,7 +92,7 @@ public class SessionPoolExample {
values.add(1L);
values.add(2L);
values.add(3L);
- pool.insertRecord(deviceId, time, measurements, types, values);
+ sessionPool.insertRecord(deviceId, time, measurements, types, values);
}
}
@@ -82,7 +102,7 @@ public class SessionPoolExample {
() -> {
SessionDataSetWrapper wrapper = null;
try {
- wrapper = pool.executeQueryStatement("select * from root.sg1.d1");
+ wrapper = sessionPool.executeQueryStatement("select * from root.sg1.d1");
System.out.println(wrapper.getColumnNames());
System.out.println(wrapper.getColumnTypes());
while (wrapper.hasNext()) {
@@ -92,7 +112,7 @@ public class SessionPoolExample {
e.printStackTrace();
} finally {
// remember to close data set finally!
- pool.closeResultSet(wrapper);
+ sessionPool.closeResultSet(wrapper);
}
});
}
@@ -104,7 +124,7 @@ public class SessionPoolExample {
() -> {
SessionDataSetWrapper wrapper = null;
try {
- wrapper = pool.executeQueryStatement("select * from root.sg1.d1");
+ wrapper = sessionPool.executeQueryStatement("select * from root.sg1.d1");
// get DataIterator like JDBC
DataIterator dataIterator = wrapper.iterator();
System.out.println(wrapper.getColumnNames());
@@ -120,7 +140,7 @@ public class SessionPoolExample {
e.printStackTrace();
} finally {
// remember to close data set finally!
- pool.closeResultSet(wrapper);
+ sessionPool.closeResultSet(wrapper);
}
});
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 9a31211403..b7bf9d9185 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -1022,8 +1022,11 @@ public class Session {
genTSInsertStringRecordsReq(deviceIds, times, measurementsList, valuesList, false);
try {
defaultSessionConnection.insertRecords(request);
- } catch (RedirectException ignored) {
- // ignore
+ } catch (RedirectException e) {
+ Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
+ for (Map.Entry<String, EndPoint> deviceEndPointEntry : deviceEndPointMap.entrySet()) {
+ handleRedirection(deviceEndPointEntry.getKey(), deviceEndPointEntry.getValue());
+ }
}
}
}
@@ -1056,8 +1059,11 @@ public class Session {
genTSInsertStringRecordsReq(deviceIds, times, measurementsList, valuesList, true);
try {
defaultSessionConnection.insertRecords(request);
- } catch (RedirectException ignored) {
- // ignore
+ } catch (RedirectException e) {
+ Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
+ for (Map.Entry<String, EndPoint> deviceEndPointEntry : deviceEndPointMap.entrySet()) {
+ handleRedirection(deviceEndPointEntry.getKey(), deviceEndPointEntry.getValue());
+ }
}
}
}
@@ -1139,8 +1145,11 @@ public class Session {
genTSInsertRecordsReq(deviceIds, times, measurementsList, typesList, valuesList, false);
try {
defaultSessionConnection.insertRecords(request);
- } catch (RedirectException ignored) {
- // ignore
+ } catch (RedirectException e) {
+ Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
+ for (Map.Entry<String, EndPoint> deviceEndPointEntry : deviceEndPointMap.entrySet()) {
+ handleRedirection(deviceEndPointEntry.getKey(), deviceEndPointEntry.getValue());
+ }
}
}
}
@@ -1174,8 +1183,11 @@ public class Session {
genTSInsertRecordsReq(deviceIds, times, measurementsList, typesList, valuesList, true);
try {
defaultSessionConnection.insertRecords(request);
- } catch (RedirectException ignored) {
- // ignore
+ } catch (RedirectException e) {
+ Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
+ for (Map.Entry<String, EndPoint> deviceEndPointEntry : deviceEndPointMap.entrySet()) {
+ handleRedirection(deviceEndPointEntry.getKey(), deviceEndPointEntry.getValue());
+ }
}
}
}
@@ -1657,8 +1669,11 @@ public class Session {
genTSInsertTabletsReq(new ArrayList<>(tablets.values()), sorted, false);
try {
defaultSessionConnection.insertTablets(request);
- } catch (RedirectException ignored) {
- // ignored
+ } catch (RedirectException e) {
+ Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
+ for (Map.Entry<String, EndPoint> deviceEndPointEntry : deviceEndPointMap.entrySet()) {
+ handleRedirection(deviceEndPointEntry.getKey(), deviceEndPointEntry.getValue());
+ }
}
}
}
@@ -1692,8 +1707,11 @@ public class Session {
genTSInsertTabletsReq(new ArrayList<>(tablets.values()), sorted, true);
try {
defaultSessionConnection.insertTablets(request);
- } catch (RedirectException ignored) {
- // ignored
+ } catch (RedirectException e) {
+ Map<String, EndPoint> deviceEndPointMap = e.getDeviceEndPointMap();
+ for (Map.Entry<String, EndPoint> deviceEndPointEntry : deviceEndPointMap.entrySet()) {
+ handleRedirection(deviceEndPointEntry.getKey(), deviceEndPointEntry.getValue());
+ }
}
}
}
@@ -2470,7 +2488,7 @@ public class Session {
private boolean enableCacheLeader = Config.DEFAULT_CACHE_LEADER_MODE;
private Version version = Config.DEFAULT_VERSION;
- List<String> nodeUrls = null;
+ private List<String> nodeUrls = null;
public Builder host(String host) {
this.host = host;
@@ -2535,16 +2553,19 @@ public class Session {
}
if (nodeUrls != null) {
- return new Session(
- nodeUrls,
- username,
- password,
- fetchSize,
- zoneId,
- thriftDefaultBufferSize,
- thriftMaxFrameSize,
- enableCacheLeader,
- version);
+ Session newSession =
+ new Session(
+ nodeUrls,
+ username,
+ password,
+ fetchSize,
+ zoneId,
+ thriftDefaultBufferSize,
+ thriftMaxFrameSize,
+ enableCacheLeader,
+ version);
+ newSession.setEnableQueryRedirection(true);
+ return newSession;
}
return new Session(
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 900f4185e4..37249b9971 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -93,6 +93,9 @@ public class SessionPool {
// whether the queue is closed.
private boolean closed;
+ // Redirect-able SessionPool
+ private final List<String> nodeUrls;
+
public SessionPool(String host, int port, String user, String password, int maxSize) {
this(
host,
@@ -108,6 +111,20 @@ public class SessionPool {
Config.DEFAULT_CONNECTION_TIMEOUT_MS);
}
+ public SessionPool(List<String> nodeUrls, String user, String password, int maxSize) {
+ this(
+ nodeUrls,
+ user,
+ password,
+ maxSize,
+ Config.DEFAULT_FETCH_SIZE,
+ 60_000,
+ false,
+ null,
+ Config.DEFAULT_CACHE_LEADER_MODE,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
+ }
+
public SessionPool(
String host, int port, String user, String password, int maxSize, boolean enableCompression) {
this(
@@ -124,6 +141,21 @@ public class SessionPool {
Config.DEFAULT_CONNECTION_TIMEOUT_MS);
}
+ public SessionPool(
+ List<String> nodeUrls, String user, String password, int maxSize, boolean enableCompression) {
+ this(
+ nodeUrls,
+ user,
+ password,
+ maxSize,
+ Config.DEFAULT_FETCH_SIZE,
+ 60_000,
+ enableCompression,
+ null,
+ Config.DEFAULT_CACHE_LEADER_MODE,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
+ }
+
public SessionPool(
String host,
int port,
@@ -146,6 +178,26 @@ public class SessionPool {
Config.DEFAULT_CONNECTION_TIMEOUT_MS);
}
+ public SessionPool(
+ List<String> nodeUrls,
+ String user,
+ String password,
+ int maxSize,
+ boolean enableCompression,
+ boolean enableCacheLeader) {
+ this(
+ nodeUrls,
+ user,
+ password,
+ maxSize,
+ Config.DEFAULT_FETCH_SIZE,
+ 60_000,
+ enableCompression,
+ null,
+ enableCacheLeader,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
+ }
+
public SessionPool(
String host, int port, String user, String password, int maxSize, ZoneId zoneId) {
this(
@@ -162,6 +214,21 @@ public class SessionPool {
Config.DEFAULT_CONNECTION_TIMEOUT_MS);
}
+ public SessionPool(
+ List<String> nodeUrls, String user, String password, int maxSize, ZoneId zoneId) {
+ this(
+ nodeUrls,
+ user,
+ password,
+ maxSize,
+ Config.DEFAULT_FETCH_SIZE,
+ 60_000,
+ false,
+ zoneId,
+ Config.DEFAULT_CACHE_LEADER_MODE,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
+ }
+
@SuppressWarnings("squid:S107")
public SessionPool(
String host,
@@ -178,6 +245,32 @@ public class SessionPool {
this.maxSize = maxSize;
this.host = host;
this.port = port;
+ this.nodeUrls = null;
+ this.user = user;
+ this.password = password;
+ this.fetchSize = fetchSize;
+ this.waitToGetSessionTimeoutInMs = waitToGetSessionTimeoutInMs;
+ this.enableCompression = enableCompression;
+ this.zoneId = zoneId;
+ this.enableCacheLeader = enableCacheLeader;
+ this.connectionTimeoutInMs = connectionTimeoutInMs;
+ }
+
+ public SessionPool(
+ List<String> nodeUrls,
+ String user,
+ String password,
+ int maxSize,
+ int fetchSize,
+ long waitToGetSessionTimeoutInMs,
+ boolean enableCompression,
+ ZoneId zoneId,
+ boolean enableCacheLeader,
+ int connectionTimeoutInMs) {
+ this.maxSize = maxSize;
+ this.host = null;
+ this.port = -1;
+ this.nodeUrls = nodeUrls;
this.user = user;
this.password = password;
this.fetchSize = fetchSize;
@@ -188,6 +281,35 @@ public class SessionPool {
this.connectionTimeoutInMs = connectionTimeoutInMs;
}
+ private Session constructNewSession() {
+ Session session;
+ if (nodeUrls == null) {
+ // Construct custom Session
+ session =
+ new Session.Builder()
+ .host(host)
+ .port(port)
+ .username(user)
+ .password(password)
+ .fetchSize(fetchSize)
+ .zoneId(zoneId)
+ .enableCacheLeader(enableCacheLeader)
+ .build();
+ } else {
+ // Construct redirect-able Session
+ session =
+ new Session.Builder()
+ .nodeUrls(nodeUrls)
+ .username(user)
+ .password(password)
+ .fetchSize(fetchSize)
+ .zoneId(zoneId)
+ .enableCacheLeader(enableCacheLeader)
+ .build();
+ }
+ return session;
+ }
+
// if this method throws an exception, either the server is broken, or the ip/port/user/password
// is incorrect.
@SuppressWarnings({"squid:S3776", "squid:S2446"}) // Suppress high Cognitive Complexity warning
@@ -254,9 +376,15 @@ public class SessionPool {
if (shouldCreate) {
// create a new one.
if (logger.isDebugEnabled()) {
- logger.debug("Create a new Session {}, {}, {}, {}", host, port, user, password);
+ if (nodeUrls == null) {
+ logger.debug("Create a new Session {}, {}, {}, {}", host, port, user, password);
+ } else {
+ logger.debug("Create a new redirect Session {}, {}, {}", nodeUrls, user, password);
+ }
}
- session = new Session(host, port, user, password, fetchSize, zoneId, enableCacheLeader);
+
+ session = constructNewSession();
+
try {
session.open(enableCompression, connectionTimeoutInMs);
// avoid someone has called close() the session pool
@@ -352,7 +480,7 @@ public class SessionPool {
@SuppressWarnings({"squid:S2446"})
private void tryConstructNewSession() {
- Session session = new Session(host, port, user, password, fetchSize, zoneId, enableCacheLeader);
+ Session session = constructNewSession();
try {
session.open(enableCompression, connectionTimeoutInMs);
// avoid someone has called close() the session pool
@@ -2177,6 +2305,7 @@ public class SessionPool {
private String host = Config.DEFAULT_HOST;
private int port = Config.DEFAULT_PORT;
+ private List<String> nodeUrls = null;
private int maxSize = Config.DEFAULT_SESSION_POOL_MAX_SIZE;
private String user = Config.DEFAULT_USER;
private String password = Config.DEFAULT_PASSWORD;
@@ -2197,6 +2326,11 @@ public class SessionPool {
return this;
}
+ public Builder nodeUrls(List<String> nodeUrls) {
+ this.nodeUrls = nodeUrls;
+ return this;
+ }
+
public Builder maxSize(int maxSize) {
this.maxSize = maxSize;
return this;
@@ -2243,18 +2377,32 @@ public class SessionPool {
}
public SessionPool build() {
- return new SessionPool(
- host,
- port,
- user,
- password,
- maxSize,
- fetchSize,
- waitToGetSessionTimeoutInMs,
- enableCompression,
- zoneId,
- enableCacheLeader,
- connectionTimeoutInMs);
+ if (nodeUrls == null) {
+ return new SessionPool(
+ host,
+ port,
+ user,
+ password,
+ maxSize,
+ fetchSize,
+ waitToGetSessionTimeoutInMs,
+ enableCompression,
+ zoneId,
+ enableCacheLeader,
+ connectionTimeoutInMs);
+ } else {
+ return new SessionPool(
+ nodeUrls,
+ user,
+ password,
+ maxSize,
+ fetchSize,
+ waitToGetSessionTimeoutInMs,
+ enableCompression,
+ zoneId,
+ enableCacheLeader,
+ connectionTimeoutInMs);
+ }
}
}
}