You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/08/24 03:42:24 UTC

[iotdb] 02/04: removeSession -> tryConstructNewSession

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch session-pool-optimization-0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ba668abcc854e751df567ae041305ab9f665e272
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Aug 24 11:23:57 2021 +0800

    removeSession -> tryConstructNewSession
---
 .../org/apache/iotdb/session/pool/SessionPool.java | 34 ++++++++++++++++------
 1 file changed, 25 insertions(+), 9 deletions(-)

diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index ed04509..d8b2991 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -325,7 +325,7 @@ public class SessionPool {
     try {
       wrapper.sessionDataSet.closeOperationHandle();
     } catch (IoTDBConnectionException | StatementExecutionException e) {
-      removeSession();
+      tryConstructNewSession();
       putback = false;
     } finally {
       Session session = occupied.remove(wrapper.session);
@@ -336,13 +336,29 @@ public class SessionPool {
   }
 
   @SuppressWarnings({"squid:S2446"})
-  private synchronized void removeSession() {
-    logger.warn("Remove a broken Session {}, {}, {}", ip, port, user);
-    size--;
-    // we do not need to notifyAll as any waited thread can continue to work after waked up.
-    this.notify();
-    if (logger.isDebugEnabled()) {
-      logger.debug("remove a broken session and notify others..., queue.size = {}", queue.size());
+  private void tryConstructNewSession() {
+    Session session = new Session(ip, port, user, password, fetchSize, zoneId, enableCacheLeader);
+    try {
+      session.open(enableCompression);
+      // avoid someone has called close() the session pool
+      synchronized (this) {
+        if (closed) {
+          // have to release the connection...
+          session.close();
+          throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
+        }
+        queue.push(session);
+        this.notify();
+      }
+    } catch (IoTDBConnectionException e) {
+      synchronized (this) {
+        size--;
+        // we do not need to notifyAll as any waited thread can continue to work after waked up.
+        this.notify();
+        if (logger.isDebugEnabled()) {
+          logger.debug("open session failed, reduce the count and notify others...");
+        }
+      }
     }
   }
 
@@ -360,7 +376,7 @@ public class SessionPool {
   private void cleanSessionAndMayThrowConnectionException(
       Session session, int times, IoTDBConnectionException e) throws IoTDBConnectionException {
     closeSession(session);
-    removeSession();
+    tryConstructNewSession();
     if (times == FINAL_RETRY) {
       throw new IoTDBConnectionException(
           String.format(