You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/08/19 13:19:46 UTC

[iotdb] 01/01: optimize session creation

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch session-pool-optimize
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 049c0984683e600464d7e0d5f4cb2276bdc0688c
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu Aug 19 19:43:02 2021 +0800

    optimize session creation
---
 .../org/apache/iotdb/session/pool/SessionPool.java | 140 ++++++++++-----------
 1 file changed, 70 insertions(+), 70 deletions(-)

diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 79a539d..ed04509 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -174,7 +174,6 @@ public class SessionPool {
 
   // if this method throws an exception, either the server is broken, or the ip/port/user/password
   // is incorrect.
-
   @SuppressWarnings({"squid:S3776", "squid:S2446"}) // Suppress high Cognitive Complexity warning
   private Session getSession() throws IoTDBConnectionException {
     Session session = queue.poll();
@@ -183,91 +182,92 @@ public class SessionPool {
     }
     if (session != null) {
       return session;
-    } else {
-      long start = System.currentTimeMillis();
-      boolean canCreate = false;
+    }
+
+    boolean shouldCreate = false;
+
+    long start = System.currentTimeMillis();
+    while (session == null) {
       synchronized (this) {
         if (size < maxSize) {
           // we can create more session
           size++;
-          canCreate = true;
+          shouldCreate = true;
           // but we do it after skip synchronized block because connection a session is time
           // consuming.
+          break;
         }
-      }
-      if (canCreate) {
-        // create a new one.
-        if (logger.isDebugEnabled()) {
-          logger.debug("Create a new Session {}, {}, {}, {}", ip, port, user, password);
-        }
-        session = new Session(ip, port, user, password, fetchSize, zoneId, enableCacheLeader);
+
+        // we have to wait for someone returns a session.
         try {
-          session.open(enableCompression);
-          // avoid someone has called close() the session pool
-          synchronized (this) {
-            if (closed) {
-              // have to release the connection...
-              session.close();
-              throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
-            } else {
-              return session;
-            }
+          if (logger.isDebugEnabled()) {
+            logger.debug("no more sessions can be created, wait... queue.size={}", queue.size());
           }
-        } catch (IoTDBConnectionException e) {
-          // if exception, we will throw the exception.
-          // Meanwhile, we have to set size--
-          synchronized (this) {
-            size--;
-            // we do not need to notifyAll as any waited thread can continue to work after waked up.
-            this.notify();
-            if (logger.isDebugEnabled()) {
-              logger.debug("open session failed, reduce the count and notify others...");
+          this.wait(1000);
+          long time = timeout < 60_000 ? timeout : 60_000;
+          if (System.currentTimeMillis() - start > time) {
+            logger.warn(
+                "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
+                (System.currentTimeMillis() - start) / 1000,
+                ip,
+                port,
+                user,
+                password);
+            logger.warn(
+                "current occupied size {}, queue size {}, considered size {} ",
+                occupied.size(),
+                queue.size(),
+                size);
+            if (System.currentTimeMillis() - start > timeout) {
+              throw new IoTDBConnectionException(
+                  String.format("timeout to get a connection from %s:%s", ip, port));
             }
           }
-          throw e;
+        } catch (InterruptedException e) {
+          logger.error("the SessionPool is damaged", e);
+          Thread.currentThread().interrupt();
         }
-      } else {
-        while (session == null) {
-          synchronized (this) {
-            if (closed) {
-              throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
-            }
-            // we have to wait for someone returns a session.
-            try {
-              if (logger.isDebugEnabled()) {
-                logger.debug(
-                    "no more sessions can be created, wait... queue.size={}", queue.size());
-              }
-              this.wait(1000);
-              long time = timeout < 60_000 ? timeout : 60_000;
-              if (System.currentTimeMillis() - start > time) {
-                logger.warn(
-                    "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
-                    (System.currentTimeMillis() - start) / 1000,
-                    ip,
-                    port,
-                    user,
-                    password);
-                logger.warn(
-                    "current occupied size {}, queue size {}, considered size {} ",
-                    occupied.size(),
-                    queue.size(),
-                    size);
-                if (System.currentTimeMillis() - start > timeout) {
-                  throw new IoTDBConnectionException(
-                      String.format("timeout to get a connection from %s:%s", ip, port));
-                }
-              }
-            } catch (InterruptedException e) {
-              logger.error("the SessionPool is damaged", e);
-              Thread.currentThread().interrupt();
-            }
-            session = queue.poll();
+
+        session = queue.poll();
+
+        if (closed) {
+          throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
+        }
+      }
+    }
+
+    if (shouldCreate) {
+      // create a new one.
+      if (logger.isDebugEnabled()) {
+        logger.debug("Create a new Session {}, {}, {}, {}", ip, port, user, password);
+      }
+      session = new Session(ip, port, user, password, fetchSize, zoneId, enableCacheLeader);
+      try {
+        session.open(enableCompression);
+        // avoid someone has called close() the session pool
+        synchronized (this) {
+          if (closed) {
+            // have to release the connection...
+            session.close();
+            throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
           }
         }
-        return session;
+      } catch (IoTDBConnectionException e) {
+        // if exception, we will throw the exception.
+        // Meanwhile, we have to set size--
+        synchronized (this) {
+          size--;
+          // we do not need to notifyAll as any waited thread can continue to work after waked up.
+          this.notify();
+          if (logger.isDebugEnabled()) {
+            logger.debug("open session failed, reduce the count and notify others...");
+          }
+        }
+        throw e;
       }
     }
+
+    return session;
   }
 
   public int currentAvailableSize() {