You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/08/19 13:19:45 UTC

[iotdb] branch session-pool-optimize created (now 049c098)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a change to branch session-pool-optimize
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 049c098  optimize session creation

This branch includes the following new commits:

     new 049c098  optimize session creation

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: optimize session creation

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch session-pool-optimize
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 049c0984683e600464d7e0d5f4cb2276bdc0688c
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu Aug 19 19:43:02 2021 +0800

    optimize session creation
---
 .../org/apache/iotdb/session/pool/SessionPool.java | 140 ++++++++++-----------
 1 file changed, 70 insertions(+), 70 deletions(-)

diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 79a539d..ed04509 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -174,7 +174,6 @@ public class SessionPool {
 
   // if this method throws an exception, either the server is broken, or the ip/port/user/password
   // is incorrect.
-
   @SuppressWarnings({"squid:S3776", "squid:S2446"}) // Suppress high Cognitive Complexity warning
   private Session getSession() throws IoTDBConnectionException {
     Session session = queue.poll();
@@ -183,91 +182,92 @@ public class SessionPool {
     }
     if (session != null) {
       return session;
-    } else {
-      long start = System.currentTimeMillis();
-      boolean canCreate = false;
+    }
+
+    boolean shouldCreate = false;
+
+    long start = System.currentTimeMillis();
+    while (session == null) {
       synchronized (this) {
         if (size < maxSize) {
           // we can create more session
           size++;
-          canCreate = true;
+          shouldCreate = true;
           // but we do it after skip synchronized block because connection a session is time
           // consuming.
+          break;
         }
-      }
-      if (canCreate) {
-        // create a new one.
-        if (logger.isDebugEnabled()) {
-          logger.debug("Create a new Session {}, {}, {}, {}", ip, port, user, password);
-        }
-        session = new Session(ip, port, user, password, fetchSize, zoneId, enableCacheLeader);
+
+        // we have to wait for someone returns a session.
         try {
-          session.open(enableCompression);
-          // avoid someone has called close() the session pool
-          synchronized (this) {
-            if (closed) {
-              // have to release the connection...
-              session.close();
-              throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
-            } else {
-              return session;
-            }
+          if (logger.isDebugEnabled()) {
+            logger.debug("no more sessions can be created, wait... queue.size={}", queue.size());
           }
-        } catch (IoTDBConnectionException e) {
-          // if exception, we will throw the exception.
-          // Meanwhile, we have to set size--
-          synchronized (this) {
-            size--;
-            // we do not need to notifyAll as any waited thread can continue to work after waked up.
-            this.notify();
-            if (logger.isDebugEnabled()) {
-              logger.debug("open session failed, reduce the count and notify others...");
+          this.wait(1000);
+          long time = timeout < 60_000 ? timeout : 60_000;
+          if (System.currentTimeMillis() - start > time) {
+            logger.warn(
+                "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
+                (System.currentTimeMillis() - start) / 1000,
+                ip,
+                port,
+                user,
+                password);
+            logger.warn(
+                "current occupied size {}, queue size {}, considered size {} ",
+                occupied.size(),
+                queue.size(),
+                size);
+            if (System.currentTimeMillis() - start > timeout) {
+              throw new IoTDBConnectionException(
+                  String.format("timeout to get a connection from %s:%s", ip, port));
             }
           }
-          throw e;
+        } catch (InterruptedException e) {
+          logger.error("the SessionPool is damaged", e);
+          Thread.currentThread().interrupt();
         }
-      } else {
-        while (session == null) {
-          synchronized (this) {
-            if (closed) {
-              throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
-            }
-            // we have to wait for someone returns a session.
-            try {
-              if (logger.isDebugEnabled()) {
-                logger.debug(
-                    "no more sessions can be created, wait... queue.size={}", queue.size());
-              }
-              this.wait(1000);
-              long time = timeout < 60_000 ? timeout : 60_000;
-              if (System.currentTimeMillis() - start > time) {
-                logger.warn(
-                    "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
-                    (System.currentTimeMillis() - start) / 1000,
-                    ip,
-                    port,
-                    user,
-                    password);
-                logger.warn(
-                    "current occupied size {}, queue size {}, considered size {} ",
-                    occupied.size(),
-                    queue.size(),
-                    size);
-                if (System.currentTimeMillis() - start > timeout) {
-                  throw new IoTDBConnectionException(
-                      String.format("timeout to get a connection from %s:%s", ip, port));
-                }
-              }
-            } catch (InterruptedException e) {
-              logger.error("the SessionPool is damaged", e);
-              Thread.currentThread().interrupt();
-            }
-            session = queue.poll();
+
+        session = queue.poll();
+
+        if (closed) {
+          throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
+        }
+      }
+    }
+
+    if (shouldCreate) {
+      // create a new one.
+      if (logger.isDebugEnabled()) {
+        logger.debug("Create a new Session {}, {}, {}, {}", ip, port, user, password);
+      }
+      session = new Session(ip, port, user, password, fetchSize, zoneId, enableCacheLeader);
+      try {
+        session.open(enableCompression);
+        // avoid someone has called close() the session pool
+        synchronized (this) {
+          if (closed) {
+            // have to release the connection...
+            session.close();
+            throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
           }
         }
-        return session;
+      } catch (IoTDBConnectionException e) {
+        // if exception, we will throw the exception.
+        // Meanwhile, we have to set size--
+        synchronized (this) {
+          size--;
+          // we do not need to notifyAll as any waited thread can continue to work after waked up.
+          this.notify();
+          if (logger.isDebugEnabled()) {
+            logger.debug("open session failed, reduce the count and notify others...");
+          }
+        }
+        throw e;
       }
     }
+
+    return session;
   }
 
   public int currentAvailableSize() {