You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/08/24 08:19:42 UTC
[iotdb] 02/05: removeSession -> tryConstructNewSession
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch session-pool-optimization
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4dcf1e273078a9651bb94433aa8cfbab3d49ef08
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Aug 24 15:57:06 2021 +0800
removeSession -> tryConstructNewSession
---
.../org/apache/iotdb/session/pool/SessionPool.java | 34 ++++++++++++++++------
1 file changed, 25 insertions(+), 9 deletions(-)
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 6939438..e4b712d 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -325,7 +325,7 @@ public class SessionPool {
try {
wrapper.sessionDataSet.closeOperationHandle();
} catch (IoTDBConnectionException | StatementExecutionException e) {
- removeSession();
+ tryConstructNewSession();
putback = false;
} finally {
Session session = occupied.remove(wrapper.session);
@@ -336,13 +336,29 @@ public class SessionPool {
}
@SuppressWarnings({"squid:S2446"})
- private synchronized void removeSession() {
- logger.warn("Remove a broken Session {}, {}, {}", host, port, user);
- size--;
- // we do not need to notifyAll as any waited thread can continue to work after waked up.
- this.notify();
- if (logger.isDebugEnabled()) {
- logger.debug("remove a broken session and notify others..., queue.size = {}", queue.size());
+ private void tryConstructNewSession() {
+ Session session = new Session(host, port, user, password, fetchSize, zoneId, enableCacheLeader);
+ try {
+ session.open(enableCompression);
+ // avoid someone has called close() the session pool
+ synchronized (this) {
+ if (closed) {
+ // have to release the connection...
+ session.close();
+ throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
+ }
+ queue.push(session);
+ this.notify();
+ }
+ } catch (IoTDBConnectionException e) {
+ synchronized (this) {
+ size--;
+ // we do not need to notifyAll as any waited thread can continue to work after waked up.
+ this.notify();
+ if (logger.isDebugEnabled()) {
+ logger.debug("open session failed, reduce the count and notify others...");
+ }
+ }
}
}
@@ -360,7 +376,7 @@ public class SessionPool {
private void cleanSessionAndMayThrowConnectionException(
Session session, int times, IoTDBConnectionException e) throws IoTDBConnectionException {
closeSession(session);
- removeSession();
+ tryConstructNewSession();
if (times == FINAL_RETRY) {
throw new IoTDBConnectionException(
String.format(