You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/08/24 03:42:22 UTC

[iotdb] branch session-pool-optimization-0.12 created (now eb4aa53)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a change to branch session-pool-optimization-0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at eb4aa53  add parameter connectionTimeoutInMs for SessionPool

This branch includes the following new commits:

     new 44d3d49  optimize session creation
     new ba668ab  removeSession -> tryConstructNewSession
     new ead6631  rename timeout in Session
     new eb4aa53  add parameter connectionTimeoutInMs for SessionPool

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 04/04: add parameter connectionTimeoutInMs for SessionPool

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch session-pool-optimization-0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit eb4aa5393a8071374bdbffbd3d8bbc91463955ad
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Aug 24 11:41:43 2021 +0800

    add parameter connectionTimeoutInMs for SessionPool
---
 .../java/org/apache/iotdb/session/Session.java     |  2 +-
 .../org/apache/iotdb/session/pool/SessionPool.java | 59 ++++++++------
 .../apache/iotdb/session/pool/SessionPoolTest.java | 95 ++++++++++++++++++++--
 3 files changed, 124 insertions(+), 32 deletions(-)

diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 20079c9..06b3536 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -314,7 +314,7 @@ public class Session {
     open(enableRPCCompression, Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
-  private synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
+  public synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
       throws IoTDBConnectionException {
     if (!isClosed) {
       return;
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index d8b2991..5fa2c7e 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -64,22 +64,27 @@ public class SessionPool {
   private static final Logger logger = LoggerFactory.getLogger(SessionPool.class);
   public static final String SESSION_POOL_IS_CLOSED = "Session pool is closed";
   public static final String CLOSE_THE_SESSION_FAILED = "close the session failed.";
-  private static int RETRY = 3;
-  private ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
+
+  private static final int RETRY = 3;
+  private static final int FINAL_RETRY = RETRY - 1;
+
+  private final ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
   // for session whose resultSet is not released.
-  private ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
+  private final ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
+
   private int size = 0;
   private int maxSize = 0;
-  private String ip;
-  private int port;
-  private String user;
-  private String password;
-  private int fetchSize;
-  private long timeout; // ms
-  private static int FINAL_RETRY = RETRY - 1;
-  private boolean enableCompression;
-  private boolean enableCacheLeader;
-  private ZoneId zoneId;
+
+  private final String ip;
+  private final int port;
+  private final String user;
+  private final String password;
+  private final int fetchSize;
+  private final long waitToGetSessionTimeoutInMs;
+  private final int connectionTimeoutInMs;
+  private final boolean enableCompression;
+  private final boolean enableCacheLeader;
+  private final ZoneId zoneId;
 
   private boolean closed; // whether the queue is closed.
 
@@ -94,7 +99,8 @@ public class SessionPool {
         60_000,
         false,
         null,
-        Config.DEFAULT_CACHE_LEADER_MODE);
+        Config.DEFAULT_CACHE_LEADER_MODE,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
   public SessionPool(
@@ -109,7 +115,8 @@ public class SessionPool {
         60_000,
         enableCompression,
         null,
-        Config.DEFAULT_CACHE_LEADER_MODE);
+        Config.DEFAULT_CACHE_LEADER_MODE,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
   public SessionPool(
@@ -130,7 +137,8 @@ public class SessionPool {
         60_000,
         enableCompression,
         null,
-        enableCacheLeader);
+        enableCacheLeader,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
   public SessionPool(
@@ -145,7 +153,8 @@ public class SessionPool {
         60_000,
         false,
         zoneId,
-        Config.DEFAULT_CACHE_LEADER_MODE);
+        Config.DEFAULT_CACHE_LEADER_MODE,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
   @SuppressWarnings("squid:S107")
@@ -156,20 +165,22 @@ public class SessionPool {
       String password,
       int maxSize,
       int fetchSize,
-      long timeout,
+      long waitToGetSessionTimeoutInMs,
       boolean enableCompression,
       ZoneId zoneId,
-      boolean enableCacheLeader) {
+      boolean enableCacheLeader,
+      int connectionTimeoutInMs) {
     this.maxSize = maxSize;
     this.ip = ip;
     this.port = port;
     this.user = user;
     this.password = password;
     this.fetchSize = fetchSize;
-    this.timeout = timeout;
+    this.waitToGetSessionTimeoutInMs = waitToGetSessionTimeoutInMs;
     this.enableCompression = enableCompression;
     this.zoneId = zoneId;
     this.enableCacheLeader = enableCacheLeader;
+    this.connectionTimeoutInMs = connectionTimeoutInMs;
   }
 
   // if this method throws an exception, either the server is broken, or the ip/port/user/password
@@ -204,7 +215,7 @@ public class SessionPool {
             logger.debug("no more sessions can be created, wait... queue.size={}", queue.size());
           }
           this.wait(1000);
-          long time = timeout < 60_000 ? timeout : 60_000;
+          long time = waitToGetSessionTimeoutInMs < 60_000 ? waitToGetSessionTimeoutInMs : 60_000;
           if (System.currentTimeMillis() - start > time) {
             logger.warn(
                 "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
@@ -218,7 +229,7 @@ public class SessionPool {
                 occupied.size(),
                 queue.size(),
                 size);
-            if (System.currentTimeMillis() - start > timeout) {
+            if (System.currentTimeMillis() - start > waitToGetSessionTimeoutInMs) {
               throw new IoTDBConnectionException(
                   String.format("timeout to get a connection from %s:%s", ip, port));
             }
@@ -243,7 +254,7 @@ public class SessionPool {
       }
       session = new Session(ip, port, user, password, fetchSize, zoneId, enableCacheLeader);
       try {
-        session.open(enableCompression);
+        session.open(enableCompression, connectionTimeoutInMs);
         // avoid someone has called close() the session pool
         synchronized (this) {
           if (closed) {
@@ -339,7 +350,7 @@ public class SessionPool {
   private void tryConstructNewSession() {
     Session session = new Session(ip, port, user, password, fetchSize, zoneId, enableCacheLeader);
     try {
-      session.open(enableCompression);
+      session.open(enableCompression, connectionTimeoutInMs);
       // avoid someone has called close() the session pool
       synchronized (this) {
         if (closed) {
diff --git a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
index 6b7d68f..18e182e 100644
--- a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
+++ b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Config;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import org.junit.After;
@@ -227,7 +228,18 @@ public class SessionPoolTest {
   @Test
   public void tryIfTheServerIsRestart() {
     SessionPool pool =
-        new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000, false, null, false);
+        new SessionPool(
+            "127.0.0.1",
+            6667,
+            "root",
+            "root",
+            3,
+            1,
+            6000,
+            false,
+            null,
+            false,
+            Config.DEFAULT_CONNECTION_TIMEOUT_MS);
     write10Data(pool, true);
     SessionDataSetWrapper wrapper = null;
     try {
@@ -243,7 +255,19 @@ public class SessionPoolTest {
       EnvironmentUtils.stopDaemon();
 
       EnvironmentUtils.reactiveDaemon();
-      pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000, false, null, false);
+      pool =
+          new SessionPool(
+              "127.0.0.1",
+              6667,
+              "root",
+              "root",
+              3,
+              1,
+              6000,
+              false,
+              null,
+              false,
+              Config.DEFAULT_CONNECTION_TIMEOUT_MS);
       correctQuery(pool);
       pool.close();
       return;
@@ -263,7 +287,19 @@ public class SessionPoolTest {
         pool.close();
         EnvironmentUtils.stopDaemon();
         EnvironmentUtils.reactiveDaemon();
-        pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000, false, null, false);
+        pool =
+            new SessionPool(
+                "127.0.0.1",
+                6667,
+                "root",
+                "root",
+                3,
+                1,
+                6000,
+                false,
+                null,
+                false,
+                Config.DEFAULT_CONNECTION_TIMEOUT_MS);
         correctQuery(pool);
         pool.close();
       } catch (StatementExecutionException es) {
@@ -282,7 +318,18 @@ public class SessionPoolTest {
   @Test
   public void tryIfTheServerIsRestartButDataIsGotten() {
     SessionPool pool =
-        new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 60000, false, null, false);
+        new SessionPool(
+            "127.0.0.1",
+            6667,
+            "root",
+            "root",
+            3,
+            1,
+            60000,
+            false,
+            null,
+            false,
+            Config.DEFAULT_CONNECTION_TIMEOUT_MS);
     write10Data(pool, true);
     assertEquals(1, pool.currentAvailableSize());
     SessionDataSetWrapper wrapper = null;
@@ -308,12 +355,35 @@ public class SessionPoolTest {
   @Test
   public void restart() {
     SessionPool pool =
-        new SessionPool("127.0.0.1", 6667, "root", "root", 1, 1, 1000, false, null, false);
+        new SessionPool(
+            "127.0.0.1",
+            6667,
+            "root",
+            "root",
+            1,
+            1,
+            1000,
+            false,
+            null,
+            false,
+            Config.DEFAULT_CONNECTION_TIMEOUT_MS);
     write10Data(pool, true);
     // stop the server.
     pool.close();
     EnvironmentUtils.stopDaemon();
-    pool = new SessionPool("127.0.0.1", 6667, "root", "root", 1, 1, 1000, false, null, false);
+    pool =
+        new SessionPool(
+            "127.0.0.1",
+            6667,
+            "root",
+            "root",
+            1,
+            1,
+            1000,
+            false,
+            null,
+            false,
+            Config.DEFAULT_CONNECTION_TIMEOUT_MS);
     // all this ten data will fail.
     write10Data(pool, false);
     // restart the server
@@ -343,7 +413,18 @@ public class SessionPoolTest {
   @Test
   public void testClose() {
     SessionPool pool =
-        new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 60000, false, null, false);
+        new SessionPool(
+            "127.0.0.1",
+            6667,
+            "root",
+            "root",
+            3,
+            1,
+            60000,
+            false,
+            null,
+            false,
+            Config.DEFAULT_CONNECTION_TIMEOUT_MS);
     pool.close();
     try {
       pool.insertRecord(

[iotdb] 01/04: optimize session creation

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch session-pool-optimization-0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 44d3d493a08556ead510cbb4d80fb5ea651a1b42
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu Aug 19 19:43:02 2021 +0800

    optimize session creation
---
 .../org/apache/iotdb/session/pool/SessionPool.java | 140 ++++++++++-----------
 1 file changed, 70 insertions(+), 70 deletions(-)

diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 79a539d..ed04509 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -174,7 +174,6 @@ public class SessionPool {
 
   // if this method throws an exception, either the server is broken, or the ip/port/user/password
   // is incorrect.
-
   @SuppressWarnings({"squid:S3776", "squid:S2446"}) // Suppress high Cognitive Complexity warning
   private Session getSession() throws IoTDBConnectionException {
     Session session = queue.poll();
@@ -183,91 +182,92 @@ public class SessionPool {
     }
     if (session != null) {
       return session;
-    } else {
-      long start = System.currentTimeMillis();
-      boolean canCreate = false;
+    }
+
+    boolean shouldCreate = false;
+
+    long start = System.currentTimeMillis();
+    while (session == null) {
       synchronized (this) {
         if (size < maxSize) {
           // we can create more session
           size++;
-          canCreate = true;
+          shouldCreate = true;
           // but we do it after skip synchronized block because connection a session is time
           // consuming.
+          break;
         }
-      }
-      if (canCreate) {
-        // create a new one.
-        if (logger.isDebugEnabled()) {
-          logger.debug("Create a new Session {}, {}, {}, {}", ip, port, user, password);
-        }
-        session = new Session(ip, port, user, password, fetchSize, zoneId, enableCacheLeader);
+
+        // we have to wait for someone returns a session.
         try {
-          session.open(enableCompression);
-          // avoid someone has called close() the session pool
-          synchronized (this) {
-            if (closed) {
-              // have to release the connection...
-              session.close();
-              throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
-            } else {
-              return session;
-            }
+          if (logger.isDebugEnabled()) {
+            logger.debug("no more sessions can be created, wait... queue.size={}", queue.size());
           }
-        } catch (IoTDBConnectionException e) {
-          // if exception, we will throw the exception.
-          // Meanwhile, we have to set size--
-          synchronized (this) {
-            size--;
-            // we do not need to notifyAll as any waited thread can continue to work after waked up.
-            this.notify();
-            if (logger.isDebugEnabled()) {
-              logger.debug("open session failed, reduce the count and notify others...");
+          this.wait(1000);
+          long time = timeout < 60_000 ? timeout : 60_000;
+          if (System.currentTimeMillis() - start > time) {
+            logger.warn(
+                "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
+                (System.currentTimeMillis() - start) / 1000,
+                ip,
+                port,
+                user,
+                password);
+            logger.warn(
+                "current occupied size {}, queue size {}, considered size {} ",
+                occupied.size(),
+                queue.size(),
+                size);
+            if (System.currentTimeMillis() - start > timeout) {
+              throw new IoTDBConnectionException(
+                  String.format("timeout to get a connection from %s:%s", ip, port));
             }
           }
-          throw e;
+        } catch (InterruptedException e) {
+          logger.error("the SessionPool is damaged", e);
+          Thread.currentThread().interrupt();
         }
-      } else {
-        while (session == null) {
-          synchronized (this) {
-            if (closed) {
-              throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
-            }
-            // we have to wait for someone returns a session.
-            try {
-              if (logger.isDebugEnabled()) {
-                logger.debug(
-                    "no more sessions can be created, wait... queue.size={}", queue.size());
-              }
-              this.wait(1000);
-              long time = timeout < 60_000 ? timeout : 60_000;
-              if (System.currentTimeMillis() - start > time) {
-                logger.warn(
-                    "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
-                    (System.currentTimeMillis() - start) / 1000,
-                    ip,
-                    port,
-                    user,
-                    password);
-                logger.warn(
-                    "current occupied size {}, queue size {}, considered size {} ",
-                    occupied.size(),
-                    queue.size(),
-                    size);
-                if (System.currentTimeMillis() - start > timeout) {
-                  throw new IoTDBConnectionException(
-                      String.format("timeout to get a connection from %s:%s", ip, port));
-                }
-              }
-            } catch (InterruptedException e) {
-              logger.error("the SessionPool is damaged", e);
-              Thread.currentThread().interrupt();
-            }
-            session = queue.poll();
+
+        session = queue.poll();
+
+        if (closed) {
+          throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
+        }
+      }
+    }
+
+    if (shouldCreate) {
+      // create a new one.
+      if (logger.isDebugEnabled()) {
+        logger.debug("Create a new Session {}, {}, {}, {}", ip, port, user, password);
+      }
+      session = new Session(ip, port, user, password, fetchSize, zoneId, enableCacheLeader);
+      try {
+        session.open(enableCompression);
+        // avoid someone has called close() the session pool
+        synchronized (this) {
+          if (closed) {
+            // have to release the connection...
+            session.close();
+            throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
           }
         }
-        return session;
+      } catch (IoTDBConnectionException e) {
+        // if exception, we will throw the exception.
+        // Meanwhile, we have to set size--
+        synchronized (this) {
+          size--;
+          // we do not need to notifyAll as any waited thread can continue to work after waked up.
+          this.notify();
+          if (logger.isDebugEnabled()) {
+            logger.debug("open session failed, reduce the count and notify others...");
+          }
+        }
+        throw e;
       }
     }
+
+    return session;
   }
 
   public int currentAvailableSize() {

[iotdb] 03/04: rename timeout in Session

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch session-pool-optimization-0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ead6631ab5d731e976aac070264e6f7d5d70f198
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Aug 24 11:26:04 2021 +0800

    rename timeout in Session
---
 .../src/main/java/org/apache/iotdb/SessionExample.java |  2 +-
 .../main/java/org/apache/iotdb/session/Session.java    | 18 +++++++++---------
 .../test/java/org/apache/iotdb/session/SessionUT.java  |  6 +++---
 3 files changed, 13 insertions(+), 13 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index a73c90c..623f813 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -659,7 +659,7 @@ public class SessionExample {
 
   private static void setTimeout() throws StatementExecutionException {
     Session tempSession = new Session(LOCAL_HOST, 6667, "root", "root", 10000, 20000);
-    tempSession.setTimeout(60000);
+    tempSession.setQueryTimeout(60000);
   }
 
   private static void createClusterSession() throws IoTDBConnectionException {
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 66726e9..20079c9 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -81,7 +81,7 @@ public class Session {
    * Timeout of query can be set by users. If not set, default value 0 will be used, which will use
    * server configuration.
    */
-  private long timeout = 0;
+  private long queryTimeoutInMs = 0;
 
   protected boolean enableRPCCompression;
   protected int connectionTimeoutInMs;
@@ -167,7 +167,7 @@ public class Session {
         Config.DEFAULT_INITIAL_BUFFER_CAPACITY,
         Config.DEFAULT_MAX_FRAME_SIZE,
         Config.DEFAULT_CACHE_LEADER_MODE);
-    this.timeout = timeoutInMs;
+    this.queryTimeoutInMs = timeoutInMs;
   }
 
   public Session(String host, int rpcPort, String username, String password, ZoneId zoneId) {
@@ -576,18 +576,18 @@ public class Session {
 
   public boolean checkTimeseriesExists(String path)
       throws IoTDBConnectionException, StatementExecutionException {
-    return defaultSessionConnection.checkTimeseriesExists(path, timeout);
+    return defaultSessionConnection.checkTimeseriesExists(path, queryTimeoutInMs);
   }
 
-  public void setTimeout(long timeoutInMs) throws StatementExecutionException {
+  public void setQueryTimeout(long timeoutInMs) throws StatementExecutionException {
     if (timeoutInMs < 0) {
       throw new StatementExecutionException("Timeout must be >= 0, please check and try again.");
     }
-    this.timeout = timeoutInMs;
+    this.queryTimeoutInMs = timeoutInMs;
   }
 
-  public long getTimeout() {
-    return timeout;
+  public long getQueryTimeout() {
+    return queryTimeoutInMs;
   }
 
   /**
@@ -598,7 +598,7 @@ public class Session {
    */
   public SessionDataSet executeQueryStatement(String sql)
       throws StatementExecutionException, IoTDBConnectionException {
-    return executeStatementMayRedirect(sql, timeout);
+    return executeStatementMayRedirect(sql, queryTimeoutInMs);
   }
 
   /**
@@ -640,7 +640,7 @@ public class Session {
             e.getEndPoint());
         // retry
         try {
-          return defaultSessionConnection.executeQueryStatement(sql, timeout);
+          return defaultSessionConnection.executeQueryStatement(sql, queryTimeoutInMs);
         } catch (RedirectException redirectException) {
           logger.error("{} redirect twice", sql, redirectException);
           throw new StatementExecutionException(sql + " redirect twice, please try again.");
diff --git a/session/src/test/java/org/apache/iotdb/session/SessionUT.java b/session/src/test/java/org/apache/iotdb/session/SessionUT.java
index bd0ed59..86a2c5b 100644
--- a/session/src/test/java/org/apache/iotdb/session/SessionUT.java
+++ b/session/src/test/java/org/apache/iotdb/session/SessionUT.java
@@ -195,8 +195,8 @@ public class SessionUT {
   @Test
   public void setTimeout() throws StatementExecutionException {
     session = new Session("127.0.0.1", 6667, "root", "root", 10000, 20000);
-    Assert.assertEquals(20000, session.getTimeout());
-    session.setTimeout(60000);
-    Assert.assertEquals(60000, session.getTimeout());
+    Assert.assertEquals(20000, session.getQueryTimeout());
+    session.setQueryTimeout(60000);
+    Assert.assertEquals(60000, session.getQueryTimeout());
   }
 }

[iotdb] 02/04: removeSession -> tryConstructNewSession

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch session-pool-optimization-0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ba668abcc854e751df567ae041305ab9f665e272
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Aug 24 11:23:57 2021 +0800

    removeSession -> tryConstructNewSession
---
 .../org/apache/iotdb/session/pool/SessionPool.java | 34 ++++++++++++++++------
 1 file changed, 25 insertions(+), 9 deletions(-)

diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index ed04509..d8b2991 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -325,7 +325,7 @@ public class SessionPool {
     try {
       wrapper.sessionDataSet.closeOperationHandle();
     } catch (IoTDBConnectionException | StatementExecutionException e) {
-      removeSession();
+      tryConstructNewSession();
       putback = false;
     } finally {
       Session session = occupied.remove(wrapper.session);
@@ -336,13 +336,29 @@ public class SessionPool {
   }
 
   @SuppressWarnings({"squid:S2446"})
-  private synchronized void removeSession() {
-    logger.warn("Remove a broken Session {}, {}, {}", ip, port, user);
-    size--;
-    // we do not need to notifyAll as any waited thread can continue to work after waked up.
-    this.notify();
-    if (logger.isDebugEnabled()) {
-      logger.debug("remove a broken session and notify others..., queue.size = {}", queue.size());
+  private void tryConstructNewSession() {
+    Session session = new Session(ip, port, user, password, fetchSize, zoneId, enableCacheLeader);
+    try {
+      session.open(enableCompression);
+      // avoid someone has called close() the session pool
+      synchronized (this) {
+        if (closed) {
+          // have to release the connection...
+          session.close();
+          throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
+        }
+        queue.push(session);
+        this.notify();
+      }
+    } catch (IoTDBConnectionException e) {
+      synchronized (this) {
+        size--;
+        // we do not need to notifyAll as any waited thread can continue to work after waked up.
+        this.notify();
+        if (logger.isDebugEnabled()) {
+          logger.debug("open session failed, reduce the count and notify others...");
+        }
+      }
     }
   }
 
@@ -360,7 +376,7 @@ public class SessionPool {
   private void cleanSessionAndMayThrowConnectionException(
       Session session, int times, IoTDBConnectionException e) throws IoTDBConnectionException {
     closeSession(session);
-    removeSession();
+    tryConstructNewSession();
     if (times == FINAL_RETRY) {
       throw new IoTDBConnectionException(
           String.format(