You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/08/24 08:19:40 UTC

[iotdb] branch session-pool-optimization created (now 744278d)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a change to branch session-pool-optimization
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 744278d  better struct

This branch includes the following new commits:

     new 2d96cc4  optimize session creation
     new 4dcf1e2  removeSession -> tryConstructNewSession
     new df8814f  rename timeout in Session
     new 815b4c2  add parameter connectionTimeoutInMs for SessionPool
     new 744278d  better struct

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 05/05: better struct

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch session-pool-optimization
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 744278d8421dd834534654d31e347dd9a3d9de44
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Aug 24 16:19:07 2021 +0800

    better struct
---
 .../java/org/apache/iotdb/session/pool/SessionPool.java     | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 1beb0c5..e94f134 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -71,22 +71,25 @@ public class SessionPool {
   private final ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
   // for session whose resultSet is not released.
   private final ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
-
   private int size = 0;
   private int maxSize = 0;
+  private final long waitToGetSessionTimeoutInMs;
 
+  // parameters for Session constructor
   private final String host;
   private final int port;
   private final String user;
   private final String password;
   private final int fetchSize;
-  private final long waitToGetSessionTimeoutInMs;
+  private final ZoneId zoneId;
+  private final boolean enableCacheLeader;
+
+  // parameters for Session#open()
   private final int connectionTimeoutInMs;
   private final boolean enableCompression;
-  private final boolean enableCacheLeader;
-  private final ZoneId zoneId;
 
-  private boolean closed; // whether the queue is closed.
+  // whether the queue is closed.
+  private boolean closed;
 
   public SessionPool(String host, int port, String user, String password, int maxSize) {
     this(

[iotdb] 03/05: rename timeout in Session

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch session-pool-optimization
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit df8814fde102a50c0eaa0f0c2853ba5d0ea4e9a9
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Aug 24 16:00:57 2021 +0800

    rename timeout in Session
---
 .../src/main/java/org/apache/iotdb/SessionExample.java |  2 +-
 .../main/java/org/apache/iotdb/session/Session.java    | 18 +++++++++---------
 .../java/org/apache/iotdb/session/SessionTest.java     |  6 +++---
 3 files changed, 13 insertions(+), 13 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 89e1a5d..a043962 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -712,7 +712,7 @@ public class SessionExample {
 
   private static void setTimeout() throws StatementExecutionException {
     Session tempSession = new Session(LOCAL_HOST, 6667, "root", "root", 10000, 20000);
-    tempSession.setTimeout(60000);
+    tempSession.setQueryTimeout(60000);
   }
 
   private static void createClusterSession() throws IoTDBConnectionException {
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 2dd122f..839d6d6 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -84,7 +84,7 @@ public class Session {
    * Timeout of query can be set by users. If not set, default value 0 will be used, which will use
    * server configuration.
    */
-  private long timeout = 0;
+  private long queryTimeoutInMs = 0;
 
   protected boolean enableRPCCompression;
   protected int connectionTimeoutInMs;
@@ -170,7 +170,7 @@ public class Session {
         Config.DEFAULT_INITIAL_BUFFER_CAPACITY,
         Config.DEFAULT_MAX_FRAME_SIZE,
         Config.DEFAULT_CACHE_LEADER_MODE);
-    this.timeout = timeoutInMs;
+    this.queryTimeoutInMs = timeoutInMs;
   }
 
   public Session(String host, int rpcPort, String username, String password, ZoneId zoneId) {
@@ -537,18 +537,18 @@ public class Session {
 
   public boolean checkTimeseriesExists(String path)
       throws IoTDBConnectionException, StatementExecutionException {
-    return defaultSessionConnection.checkTimeseriesExists(path, timeout);
+    return defaultSessionConnection.checkTimeseriesExists(path, queryTimeoutInMs);
   }
 
-  public void setTimeout(long timeoutInMs) throws StatementExecutionException {
+  public void setQueryTimeout(long timeoutInMs) throws StatementExecutionException {
     if (timeoutInMs < 0) {
       throw new StatementExecutionException("Timeout must be >= 0, please check and try again.");
     }
-    this.timeout = timeoutInMs;
+    this.queryTimeoutInMs = timeoutInMs;
   }
 
-  public long getTimeout() {
-    return timeout;
+  public long getQueryTimeout() {
+    return queryTimeoutInMs;
   }
 
   /**
@@ -559,7 +559,7 @@ public class Session {
    */
   public SessionDataSet executeQueryStatement(String sql)
       throws StatementExecutionException, IoTDBConnectionException {
-    return executeStatementMayRedirect(sql, timeout);
+    return executeStatementMayRedirect(sql, queryTimeoutInMs);
   }
 
   /**
@@ -601,7 +601,7 @@ public class Session {
             e.getEndPoint());
         // retry
         try {
-          return defaultSessionConnection.executeQueryStatement(sql, timeout);
+          return defaultSessionConnection.executeQueryStatement(sql, queryTimeoutInMs);
         } catch (RedirectException redirectException) {
           logger.error("{} redirect twice", sql, redirectException);
           throw new StatementExecutionException(sql + " redirect twice, please try again.");
diff --git a/session/src/test/java/org/apache/iotdb/session/SessionTest.java b/session/src/test/java/org/apache/iotdb/session/SessionTest.java
index ad06d3d..144910f 100644
--- a/session/src/test/java/org/apache/iotdb/session/SessionTest.java
+++ b/session/src/test/java/org/apache/iotdb/session/SessionTest.java
@@ -200,9 +200,9 @@ public class SessionTest {
   @Test
   public void setTimeout() throws StatementExecutionException {
     session = new Session("127.0.0.1", 6667, "root", "root", 10000, 20000);
-    Assert.assertEquals(20000, session.getTimeout());
-    session.setTimeout(60000);
-    Assert.assertEquals(60000, session.getTimeout());
+    Assert.assertEquals(20000, session.getQueryTimeout());
+    session.setQueryTimeout(60000);
+    Assert.assertEquals(60000, session.getQueryTimeout());
   }
 
   @Test

[iotdb] 02/05: removeSession -> tryConstructNewSession

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch session-pool-optimization
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4dcf1e273078a9651bb94433aa8cfbab3d49ef08
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Aug 24 15:57:06 2021 +0800

    removeSession -> tryConstructNewSession
---
 .../org/apache/iotdb/session/pool/SessionPool.java | 34 ++++++++++++++++------
 1 file changed, 25 insertions(+), 9 deletions(-)

diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 6939438..e4b712d 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -325,7 +325,7 @@ public class SessionPool {
     try {
       wrapper.sessionDataSet.closeOperationHandle();
     } catch (IoTDBConnectionException | StatementExecutionException e) {
-      removeSession();
+      tryConstructNewSession();
       putback = false;
     } finally {
       Session session = occupied.remove(wrapper.session);
@@ -336,13 +336,29 @@ public class SessionPool {
   }
 
   @SuppressWarnings({"squid:S2446"})
-  private synchronized void removeSession() {
-    logger.warn("Remove a broken Session {}, {}, {}", host, port, user);
-    size--;
-    // we do not need to notifyAll as any waited thread can continue to work after waked up.
-    this.notify();
-    if (logger.isDebugEnabled()) {
-      logger.debug("remove a broken session and notify others..., queue.size = {}", queue.size());
+  private void tryConstructNewSession() {
+    Session session = new Session(host, port, user, password, fetchSize, zoneId, enableCacheLeader);
+    try {
+      session.open(enableCompression);
+      // avoid someone has called close() the session pool
+      synchronized (this) {
+        if (closed) {
+          // have to release the connection...
+          session.close();
+          throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
+        }
+        queue.push(session);
+        this.notify();
+      }
+    } catch (IoTDBConnectionException e) {
+      synchronized (this) {
+        size--;
+        // we do not need to notifyAll as any waited thread can continue to work after waked up.
+        this.notify();
+        if (logger.isDebugEnabled()) {
+          logger.debug("open session failed, reduce the count and notify others...");
+        }
+      }
     }
   }
 
@@ -360,7 +376,7 @@ public class SessionPool {
   private void cleanSessionAndMayThrowConnectionException(
       Session session, int times, IoTDBConnectionException e) throws IoTDBConnectionException {
     closeSession(session);
-    removeSession();
+    tryConstructNewSession();
     if (times == FINAL_RETRY) {
       throw new IoTDBConnectionException(
           String.format(

[iotdb] 01/05: optimize session creation

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch session-pool-optimization
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2d96cc4f788f0e9ebddc60269d8d7163af8c46d3
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Aug 24 15:55:40 2021 +0800

    optimize session creation
---
 .../org/apache/iotdb/session/pool/SessionPool.java | 140 ++++++++++-----------
 1 file changed, 70 insertions(+), 70 deletions(-)

diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 2effaa2..6939438 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -174,7 +174,6 @@ public class SessionPool {
 
   // if this method throws an exception, either the server is broken, or the ip/port/user/password
   // is incorrect.
-
   @SuppressWarnings({"squid:S3776", "squid:S2446"}) // Suppress high Cognitive Complexity warning
   private Session getSession() throws IoTDBConnectionException {
     Session session = queue.poll();
@@ -183,91 +182,92 @@ public class SessionPool {
     }
     if (session != null) {
       return session;
-    } else {
-      long start = System.currentTimeMillis();
-      boolean canCreate = false;
+    }
+
+    boolean shouldCreate = false;
+
+    long start = System.currentTimeMillis();
+    while (session == null) {
       synchronized (this) {
         if (size < maxSize) {
           // we can create more session
           size++;
-          canCreate = true;
+          shouldCreate = true;
           // but we do it after skip synchronized block because connection a session is time
           // consuming.
+          break;
         }
-      }
-      if (canCreate) {
-        // create a new one.
-        if (logger.isDebugEnabled()) {
-          logger.debug("Create a new Session {}, {}, {}, {}", host, port, user, password);
-        }
-        session = new Session(host, port, user, password, fetchSize, zoneId, enableCacheLeader);
+
+        // we have to wait for someone returns a session.
         try {
-          session.open(enableCompression);
-          // avoid someone has called close() the session pool
-          synchronized (this) {
-            if (closed) {
-              // have to release the connection...
-              session.close();
-              throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
-            } else {
-              return session;
-            }
+          if (logger.isDebugEnabled()) {
+            logger.debug("no more sessions can be created, wait... queue.size={}", queue.size());
           }
-        } catch (IoTDBConnectionException e) {
-          // if exception, we will throw the exception.
-          // Meanwhile, we have to set size--
-          synchronized (this) {
-            size--;
-            // we do not need to notifyAll as any waited thread can continue to work after waked up.
-            this.notify();
-            if (logger.isDebugEnabled()) {
-              logger.debug("open session failed, reduce the count and notify others...");
+          this.wait(1000);
+          long time = timeout < 60_000 ? timeout : 60_000;
+          if (System.currentTimeMillis() - start > time) {
+            logger.warn(
+                "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
+                (System.currentTimeMillis() - start) / 1000,
+                host,
+                port,
+                user,
+                password);
+            logger.warn(
+                "current occupied size {}, queue size {}, considered size {} ",
+                occupied.size(),
+                queue.size(),
+                size);
+            if (System.currentTimeMillis() - start > timeout) {
+              throw new IoTDBConnectionException(
+                  String.format("timeout to get a connection from %s:%s", host, port));
             }
           }
-          throw e;
+        } catch (InterruptedException e) {
+          logger.error("the SessionPool is damaged", e);
+          Thread.currentThread().interrupt();
         }
-      } else {
-        while (session == null) {
-          synchronized (this) {
-            if (closed) {
-              throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
-            }
-            // we have to wait for someone returns a session.
-            try {
-              if (logger.isDebugEnabled()) {
-                logger.debug(
-                    "no more sessions can be created, wait... queue.size={}", queue.size());
-              }
-              this.wait(1000);
-              long time = timeout < 60_000 ? timeout : 60_000;
-              if (System.currentTimeMillis() - start > time) {
-                logger.warn(
-                    "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
-                    (System.currentTimeMillis() - start) / 1000,
-                    host,
-                    port,
-                    user,
-                    password);
-                logger.warn(
-                    "current occupied size {}, queue size {}, considered size {} ",
-                    occupied.size(),
-                    queue.size(),
-                    size);
-                if (System.currentTimeMillis() - start > timeout) {
-                  throw new IoTDBConnectionException(
-                      String.format("timeout to get a connection from %s:%s", host, port));
-                }
-              }
-            } catch (InterruptedException e) {
-              logger.error("the SessionPool is damaged", e);
-              Thread.currentThread().interrupt();
-            }
-            session = queue.poll();
+
+        session = queue.poll();
+
+        if (closed) {
+          throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
+        }
+      }
+    }
+
+    if (shouldCreate) {
+      // create a new one.
+      if (logger.isDebugEnabled()) {
+        logger.debug("Create a new Session {}, {}, {}, {}", host, port, user, password);
+      }
+      session = new Session(host, port, user, password, fetchSize, zoneId, enableCacheLeader);
+      try {
+        session.open(enableCompression);
+        // avoid someone has called close() the session pool
+        synchronized (this) {
+          if (closed) {
+            // have to release the connection...
+            session.close();
+            throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
           }
         }
-        return session;
+      } catch (IoTDBConnectionException e) {
+        // if exception, we will throw the exception.
+        // Meanwhile, we have to set size--
+        synchronized (this) {
+          size--;
+          // we do not need to notifyAll as any waited thread can continue to work after waked up.
+          this.notify();
+          if (logger.isDebugEnabled()) {
+            logger.debug("open session failed, reduce the count and notify others...");
+          }
+        }
+        throw e;
       }
     }
+
+    return session;
   }
 
   public int currentAvailableSize() {

[iotdb] 04/05: add parameter connectionTimeoutInMs for SessionPool

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch session-pool-optimization
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 815b4c23c752991ec2e9c7daddf0ac56d0a489c7
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Aug 24 16:17:37 2021 +0800

    add parameter connectionTimeoutInMs for SessionPool
---
 .../java/org/apache/iotdb/session/Session.java     |   2 +-
 .../org/apache/iotdb/session/pool/SessionPool.java |  85 ++++++++++-------
 .../apache/iotdb/session/pool/SessionPoolTest.java | 103 +++++++++++++++++++--
 3 files changed, 148 insertions(+), 42 deletions(-)

diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 839d6d6..b28afcf 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -317,7 +317,7 @@ public class Session {
     open(enableRPCCompression, Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
-  private synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
+  public synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
       throws IoTDBConnectionException {
     if (!isClosed) {
       return;
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index e4b712d..1beb0c5 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -64,22 +64,27 @@ public class SessionPool {
   private static final Logger logger = LoggerFactory.getLogger(SessionPool.class);
   public static final String SESSION_POOL_IS_CLOSED = "Session pool is closed";
   public static final String CLOSE_THE_SESSION_FAILED = "close the session failed.";
-  private static int RETRY = 3;
-  private ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
+
+  private static final int RETRY = 3;
+  private static final int FINAL_RETRY = RETRY - 1;
+
+  private final ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
   // for session whose resultSet is not released.
-  private ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
+  private final ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
+
   private int size = 0;
   private int maxSize = 0;
-  private String host;
-  private int port;
-  private String user;
-  private String password;
-  private int fetchSize;
-  private long timeout; // ms
-  private static int FINAL_RETRY = RETRY - 1;
-  private boolean enableCompression;
-  private boolean enableCacheLeader;
-  private ZoneId zoneId;
+
+  private final String host;
+  private final int port;
+  private final String user;
+  private final String password;
+  private final int fetchSize;
+  private final long waitToGetSessionTimeoutInMs;
+  private final int connectionTimeoutInMs;
+  private final boolean enableCompression;
+  private final boolean enableCacheLeader;
+  private final ZoneId zoneId;
 
   private boolean closed; // whether the queue is closed.
 
@@ -94,7 +99,8 @@ public class SessionPool {
         60_000,
         false,
         null,
-        Config.DEFAULT_CACHE_LEADER_MODE);
+        Config.DEFAULT_CACHE_LEADER_MODE,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
   public SessionPool(
@@ -109,7 +115,8 @@ public class SessionPool {
         60_000,
         enableCompression,
         null,
-        Config.DEFAULT_CACHE_LEADER_MODE);
+        Config.DEFAULT_CACHE_LEADER_MODE,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
   public SessionPool(
@@ -130,7 +137,8 @@ public class SessionPool {
         60_000,
         enableCompression,
         null,
-        enableCacheLeader);
+        enableCacheLeader,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
   public SessionPool(
@@ -145,7 +153,8 @@ public class SessionPool {
         60_000,
         false,
         zoneId,
-        Config.DEFAULT_CACHE_LEADER_MODE);
+        Config.DEFAULT_CACHE_LEADER_MODE,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
   @SuppressWarnings("squid:S107")
@@ -156,20 +165,22 @@ public class SessionPool {
       String password,
       int maxSize,
       int fetchSize,
-      long timeout,
+      long waitToGetSessionTimeoutInMs,
       boolean enableCompression,
       ZoneId zoneId,
-      boolean enableCacheLeader) {
+      boolean enableCacheLeader,
+      int connectionTimeoutInMs) {
     this.maxSize = maxSize;
     this.host = host;
     this.port = port;
     this.user = user;
     this.password = password;
     this.fetchSize = fetchSize;
-    this.timeout = timeout;
+    this.waitToGetSessionTimeoutInMs = waitToGetSessionTimeoutInMs;
     this.enableCompression = enableCompression;
     this.zoneId = zoneId;
     this.enableCacheLeader = enableCacheLeader;
+    this.connectionTimeoutInMs = connectionTimeoutInMs;
   }
 
   // if this method throws an exception, either the server is broken, or the ip/port/user/password
@@ -204,7 +215,7 @@ public class SessionPool {
             logger.debug("no more sessions can be created, wait... queue.size={}", queue.size());
           }
           this.wait(1000);
-          long time = timeout < 60_000 ? timeout : 60_000;
+          long time = waitToGetSessionTimeoutInMs < 60_000 ? waitToGetSessionTimeoutInMs : 60_000;
           if (System.currentTimeMillis() - start > time) {
             logger.warn(
                 "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
@@ -218,7 +229,7 @@ public class SessionPool {
                 occupied.size(),
                 queue.size(),
                 size);
-            if (System.currentTimeMillis() - start > timeout) {
+            if (System.currentTimeMillis() - start > waitToGetSessionTimeoutInMs) {
               throw new IoTDBConnectionException(
                   String.format("timeout to get a connection from %s:%s", host, port));
             }
@@ -243,7 +254,7 @@ public class SessionPool {
       }
       session = new Session(host, port, user, password, fetchSize, zoneId, enableCacheLeader);
       try {
-        session.open(enableCompression);
+        session.open(enableCompression, connectionTimeoutInMs);
         // avoid someone has called close() the session pool
         synchronized (this) {
           if (closed) {
@@ -339,7 +350,7 @@ public class SessionPool {
   private void tryConstructNewSession() {
     Session session = new Session(host, port, user, password, fetchSize, zoneId, enableCacheLeader);
     try {
-      session.open(enableCompression);
+      session.open(enableCompression, connectionTimeoutInMs);
       // avoid someone has called close() the session pool
       synchronized (this) {
         if (closed) {
@@ -1206,8 +1217,8 @@ public class SessionPool {
     return zoneId;
   }
 
-  public long getTimeout() {
-    return timeout;
+  public long getWaitToGetSessionTimeoutInMs() {
+    return waitToGetSessionTimeoutInMs;
   }
 
   public boolean isEnableCompression() {
@@ -1218,17 +1229,23 @@ public class SessionPool {
     return enableCacheLeader;
   }
 
+  public int getConnectionTimeoutInMs() {
+    return connectionTimeoutInMs;
+  }
+
   public static class Builder {
+
     private String host = Config.DEFAULT_HOST;
     private int port = Config.DEFAULT_PORT;
     private int maxSize = Config.DEFAULT_SESSION_POOL_MAX_SIZE;
     private String user = Config.DEFAULT_USER;
     private String password = Config.DEFAULT_PASSWORD;
     private int fetchSize = Config.DEFAULT_FETCH_SIZE;
-    private long timeout = 60_000;
+    private long waitToGetSessionTimeoutInMs = 60_000;
     private boolean enableCompression = false;
     private ZoneId zoneId = null;
     private boolean enableCacheLeader = Config.DEFAULT_CACHE_LEADER_MODE;
+    private int connectionTimeoutInMs = Config.DEFAULT_CONNECTION_TIMEOUT_MS;
 
     public Builder host(String host) {
       this.host = host;
@@ -1265,8 +1282,8 @@ public class SessionPool {
       return this;
     }
 
-    public Builder timeout(long timeout) {
-      this.timeout = timeout;
+    public Builder waitToGetSessionTimeoutInMs(long waitToGetSessionTimeoutInMs) {
+      this.waitToGetSessionTimeoutInMs = waitToGetSessionTimeoutInMs;
       return this;
     }
 
@@ -1280,6 +1297,11 @@ public class SessionPool {
       return this;
     }
 
+    public Builder connectionTimeoutInMs(int connectionTimeoutInMs) {
+      this.connectionTimeoutInMs = connectionTimeoutInMs;
+      return this;
+    }
+
     public SessionPool build() {
       return new SessionPool(
           host,
@@ -1288,10 +1310,11 @@ public class SessionPool {
           password,
           maxSize,
           fetchSize,
-          timeout,
+          waitToGetSessionTimeoutInMs,
           enableCompression,
           zoneId,
-          enableCacheLeader);
+          enableCacheLeader,
+          connectionTimeoutInMs);
     }
   }
 }
diff --git a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
index 9116070..dc39d47 100644
--- a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
+++ b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Config;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import org.junit.After;
@@ -228,7 +229,18 @@ public class SessionPoolTest {
   @Test
   public void tryIfTheServerIsRestart() {
     SessionPool pool =
-        new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000, false, null, false);
+        new SessionPool(
+            "127.0.0.1",
+            6667,
+            "root",
+            "root",
+            3,
+            1,
+            6000,
+            false,
+            null,
+            false,
+            Config.DEFAULT_CONNECTION_TIMEOUT_MS);
     write10Data(pool, true);
     SessionDataSetWrapper wrapper = null;
     try {
@@ -244,7 +256,19 @@ public class SessionPoolTest {
       EnvironmentUtils.stopDaemon();
 
       EnvironmentUtils.reactiveDaemon();
-      pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000, false, null, false);
+      pool =
+          new SessionPool(
+              "127.0.0.1",
+              6667,
+              "root",
+              "root",
+              3,
+              1,
+              6000,
+              false,
+              null,
+              false,
+              Config.DEFAULT_CONNECTION_TIMEOUT_MS);
       correctQuery(pool);
       pool.close();
       return;
@@ -264,7 +288,19 @@ public class SessionPoolTest {
         pool.close();
         EnvironmentUtils.stopDaemon();
         EnvironmentUtils.reactiveDaemon();
-        pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000, false, null, false);
+        pool =
+            new SessionPool(
+                "127.0.0.1",
+                6667,
+                "root",
+                "root",
+                3,
+                1,
+                6000,
+                false,
+                null,
+                false,
+                Config.DEFAULT_CONNECTION_TIMEOUT_MS);
         correctQuery(pool);
         pool.close();
       } catch (StatementExecutionException es) {
@@ -283,10 +319,21 @@ public class SessionPoolTest {
   @Test
   public void tryIfTheServerIsRestartButDataIsGotten() {
     SessionPool pool =
-        new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 60000, false, null, false);
+        new SessionPool(
+            "127.0.0.1",
+            6667,
+            "root",
+            "root",
+            3,
+            1,
+            60000,
+            false,
+            null,
+            false,
+            Config.DEFAULT_CONNECTION_TIMEOUT_MS);
     write10Data(pool, true);
     assertEquals(1, pool.currentAvailableSize());
-    SessionDataSetWrapper wrapper = null;
+    SessionDataSetWrapper wrapper;
     try {
       wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time > 1");
       // user does not know what happens.
@@ -309,12 +356,35 @@ public class SessionPoolTest {
   @Test
   public void restart() {
     SessionPool pool =
-        new SessionPool("127.0.0.1", 6667, "root", "root", 1, 1, 1000, false, null, false);
+        new SessionPool(
+            "127.0.0.1",
+            6667,
+            "root",
+            "root",
+            1,
+            1,
+            1000,
+            false,
+            null,
+            false,
+            Config.DEFAULT_CONNECTION_TIMEOUT_MS);
     write10Data(pool, true);
     // stop the server.
     pool.close();
     EnvironmentUtils.stopDaemon();
-    pool = new SessionPool("127.0.0.1", 6667, "root", "root", 1, 1, 1000, false, null, false);
+    pool =
+        new SessionPool(
+            "127.0.0.1",
+            6667,
+            "root",
+            "root",
+            1,
+            1,
+            1000,
+            false,
+            null,
+            false,
+            Config.DEFAULT_CONNECTION_TIMEOUT_MS);
     // all this ten data will fail.
     write10Data(pool, false);
     // restart the server
@@ -344,7 +414,18 @@ public class SessionPoolTest {
   @Test
   public void testClose() {
     SessionPool pool =
-        new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 60000, false, null, false);
+        new SessionPool(
+            "127.0.0.1",
+            6667,
+            "root",
+            "root",
+            3,
+            1,
+            60000,
+            false,
+            null,
+            false,
+            Config.DEFAULT_CONNECTION_TIMEOUT_MS);
     pool.close();
     try {
       pool.insertRecord(
@@ -373,10 +454,11 @@ public class SessionPoolTest {
             .user("abc")
             .password("123")
             .fetchSize(1)
-            .timeout(2)
+            .waitToGetSessionTimeoutInMs(2)
             .enableCacheLeader(true)
             .enableCompression(true)
             .zoneId(ZoneOffset.UTC)
+            .connectionTimeoutInMs(3)
             .build();
 
     assertEquals("localhost", pool.getHost());
@@ -385,9 +467,10 @@ public class SessionPoolTest {
     assertEquals("123", pool.getPassword());
     assertEquals(10, pool.getMaxSize());
     assertEquals(1, pool.getFetchSize());
-    assertEquals(2, pool.getTimeout());
+    assertEquals(2, pool.getWaitToGetSessionTimeoutInMs());
     assertTrue(pool.isEnableCacheLeader());
     assertTrue(pool.isEnableCompression());
+    assertEquals(3, pool.getConnectionTimeoutInMs());
     assertEquals(ZoneOffset.UTC, pool.getZoneId());
   }
 }