You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/08/24 09:44:07 UTC

[iotdb] branch master updated: [IOTDB-1587] SessionPool optimization: a more aggressive Session creation strategy (#3823)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 62135fc  [IOTDB-1587] SessionPool optimization: a more aggressive Session creation strategy (#3823)
62135fc is described below

commit 62135fc866fac5f165e2350fa0579459c318910d
Author: Steve Yurong Su (宇荣) <ro...@apache.org>
AuthorDate: Tue Aug 24 04:43:42 2021 -0500

    [IOTDB-1587] SessionPool optimization: a more aggressive Session creation strategy (#3823)
    
    1. rename timeout to queryTimeoutInMs in Session
    2. allow users to set connectionTimeoutInMs in SessionPool
    3. new Session creation strategy in SessionPool: removeSession -> tryConstructNewSession
    4. new Session creation strategy in SessionPool: when the Session cannot be obtained and the thread enters the waiting loop, it will always judge whether a new Session can be created first in each loop, instead of waiting in each loop after judging whether a new Session can be created only once.
---
 .../main/java/org/apache/iotdb/SessionExample.java |   2 +-
 .../java/org/apache/iotdb/session/Session.java     |  27 ++-
 .../org/apache/iotdb/session/pool/SessionPool.java | 258 ++++++++++++---------
 .../java/org/apache/iotdb/session/SessionTest.java |   6 +-
 .../apache/iotdb/session/pool/SessionPoolTest.java | 103 +++++++-
 5 files changed, 263 insertions(+), 133 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 89e1a5d..a043962 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -712,7 +712,7 @@ public class SessionExample {
 
   private static void setTimeout() throws StatementExecutionException {
     Session tempSession = new Session(LOCAL_HOST, 6667, "root", "root", 10000, 20000);
-    tempSession.setTimeout(60000);
+    tempSession.setQueryTimeout(60000);
   }
 
   private static void createClusterSession() throws IoTDBConnectionException {
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 2dd122f..039c186 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -84,7 +84,7 @@ public class Session {
    * Timeout of query can be set by users. If not set, default value 0 will be used, which will use
    * server configuration.
    */
-  private long timeout = 0;
+  private long queryTimeoutInMs = 0;
 
   protected boolean enableRPCCompression;
   protected int connectionTimeoutInMs;
@@ -159,7 +159,12 @@ public class Session {
   }
 
   public Session(
-      String host, int rpcPort, String username, String password, int fetchSize, long timeoutInMs) {
+      String host,
+      int rpcPort,
+      String username,
+      String password,
+      int fetchSize,
+      long queryTimeoutInMs) {
     this(
         host,
         rpcPort,
@@ -170,7 +175,7 @@ public class Session {
         Config.DEFAULT_INITIAL_BUFFER_CAPACITY,
         Config.DEFAULT_MAX_FRAME_SIZE,
         Config.DEFAULT_CACHE_LEADER_MODE);
-    this.timeout = timeoutInMs;
+    this.queryTimeoutInMs = queryTimeoutInMs;
   }
 
   public Session(String host, int rpcPort, String username, String password, ZoneId zoneId) {
@@ -317,7 +322,7 @@ public class Session {
     open(enableRPCCompression, Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
-  private synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
+  public synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
       throws IoTDBConnectionException {
     if (!isClosed) {
       return;
@@ -537,18 +542,18 @@ public class Session {
 
   public boolean checkTimeseriesExists(String path)
       throws IoTDBConnectionException, StatementExecutionException {
-    return defaultSessionConnection.checkTimeseriesExists(path, timeout);
+    return defaultSessionConnection.checkTimeseriesExists(path, queryTimeoutInMs);
   }
 
-  public void setTimeout(long timeoutInMs) throws StatementExecutionException {
+  public void setQueryTimeout(long timeoutInMs) throws StatementExecutionException {
     if (timeoutInMs < 0) {
       throw new StatementExecutionException("Timeout must be >= 0, please check and try again.");
     }
-    this.timeout = timeoutInMs;
+    this.queryTimeoutInMs = timeoutInMs;
   }
 
-  public long getTimeout() {
-    return timeout;
+  public long getQueryTimeout() {
+    return queryTimeoutInMs;
   }
 
   /**
@@ -559,7 +564,7 @@ public class Session {
    */
   public SessionDataSet executeQueryStatement(String sql)
       throws StatementExecutionException, IoTDBConnectionException {
-    return executeStatementMayRedirect(sql, timeout);
+    return executeStatementMayRedirect(sql, queryTimeoutInMs);
   }
 
   /**
@@ -601,7 +606,7 @@ public class Session {
             e.getEndPoint());
         // retry
         try {
-          return defaultSessionConnection.executeQueryStatement(sql, timeout);
+          return defaultSessionConnection.executeQueryStatement(sql, queryTimeoutInMs);
         } catch (RedirectException redirectException) {
           logger.error("{} redirect twice", sql, redirectException);
           throw new StatementExecutionException(sql + " redirect twice, please try again.");
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 2effaa2..e94f134 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -64,24 +64,32 @@ public class SessionPool {
   private static final Logger logger = LoggerFactory.getLogger(SessionPool.class);
   public static final String SESSION_POOL_IS_CLOSED = "Session pool is closed";
   public static final String CLOSE_THE_SESSION_FAILED = "close the session failed.";
-  private static int RETRY = 3;
-  private ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
+
+  private static final int RETRY = 3;
+  private static final int FINAL_RETRY = RETRY - 1;
+
+  private final ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
   // for session whose resultSet is not released.
-  private ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
+  private final ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
   private int size = 0;
   private int maxSize = 0;
-  private String host;
-  private int port;
-  private String user;
-  private String password;
-  private int fetchSize;
-  private long timeout; // ms
-  private static int FINAL_RETRY = RETRY - 1;
-  private boolean enableCompression;
-  private boolean enableCacheLeader;
-  private ZoneId zoneId;
-
-  private boolean closed; // whether the queue is closed.
+  private final long waitToGetSessionTimeoutInMs;
+
+  // parameters for Session constructor
+  private final String host;
+  private final int port;
+  private final String user;
+  private final String password;
+  private final int fetchSize;
+  private final ZoneId zoneId;
+  private final boolean enableCacheLeader;
+
+  // parameters for Session#open()
+  private final int connectionTimeoutInMs;
+  private final boolean enableCompression;
+
+  // whether the queue is closed.
+  private boolean closed;
 
   public SessionPool(String host, int port, String user, String password, int maxSize) {
     this(
@@ -94,7 +102,8 @@ public class SessionPool {
         60_000,
         false,
         null,
-        Config.DEFAULT_CACHE_LEADER_MODE);
+        Config.DEFAULT_CACHE_LEADER_MODE,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
   public SessionPool(
@@ -109,7 +118,8 @@ public class SessionPool {
         60_000,
         enableCompression,
         null,
-        Config.DEFAULT_CACHE_LEADER_MODE);
+        Config.DEFAULT_CACHE_LEADER_MODE,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
   public SessionPool(
@@ -130,7 +140,8 @@ public class SessionPool {
         60_000,
         enableCompression,
         null,
-        enableCacheLeader);
+        enableCacheLeader,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
   public SessionPool(
@@ -145,7 +156,8 @@ public class SessionPool {
         60_000,
         false,
         zoneId,
-        Config.DEFAULT_CACHE_LEADER_MODE);
+        Config.DEFAULT_CACHE_LEADER_MODE,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
   @SuppressWarnings("squid:S107")
@@ -156,25 +168,26 @@ public class SessionPool {
       String password,
       int maxSize,
       int fetchSize,
-      long timeout,
+      long waitToGetSessionTimeoutInMs,
       boolean enableCompression,
       ZoneId zoneId,
-      boolean enableCacheLeader) {
+      boolean enableCacheLeader,
+      int connectionTimeoutInMs) {
     this.maxSize = maxSize;
     this.host = host;
     this.port = port;
     this.user = user;
     this.password = password;
     this.fetchSize = fetchSize;
-    this.timeout = timeout;
+    this.waitToGetSessionTimeoutInMs = waitToGetSessionTimeoutInMs;
     this.enableCompression = enableCompression;
     this.zoneId = zoneId;
     this.enableCacheLeader = enableCacheLeader;
+    this.connectionTimeoutInMs = connectionTimeoutInMs;
   }
 
   // if this method throws an exception, either the server is broken, or the ip/port/user/password
   // is incorrect.
-
   @SuppressWarnings({"squid:S3776", "squid:S2446"}) // Suppress high Cognitive Complexity warning
   private Session getSession() throws IoTDBConnectionException {
     Session session = queue.poll();
@@ -183,91 +196,92 @@ public class SessionPool {
     }
     if (session != null) {
       return session;
-    } else {
-      long start = System.currentTimeMillis();
-      boolean canCreate = false;
+    }
+
+    boolean shouldCreate = false;
+
+    long start = System.currentTimeMillis();
+    while (session == null) {
       synchronized (this) {
         if (size < maxSize) {
           // we can create more session
           size++;
-          canCreate = true;
+          shouldCreate = true;
           // but we do it after skip synchronized block because connection a session is time
           // consuming.
+          break;
         }
-      }
-      if (canCreate) {
-        // create a new one.
-        if (logger.isDebugEnabled()) {
-          logger.debug("Create a new Session {}, {}, {}, {}", host, port, user, password);
-        }
-        session = new Session(host, port, user, password, fetchSize, zoneId, enableCacheLeader);
+
+        // we have to wait for someone returns a session.
         try {
-          session.open(enableCompression);
-          // avoid someone has called close() the session pool
-          synchronized (this) {
-            if (closed) {
-              // have to release the connection...
-              session.close();
-              throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
-            } else {
-              return session;
-            }
+          if (logger.isDebugEnabled()) {
+            logger.debug("no more sessions can be created, wait... queue.size={}", queue.size());
           }
-        } catch (IoTDBConnectionException e) {
-          // if exception, we will throw the exception.
-          // Meanwhile, we have to set size--
-          synchronized (this) {
-            size--;
-            // we do not need to notifyAll as any waited thread can continue to work after waked up.
-            this.notify();
-            if (logger.isDebugEnabled()) {
-              logger.debug("open session failed, reduce the count and notify others...");
+          this.wait(1000);
+          long time = waitToGetSessionTimeoutInMs < 60_000 ? waitToGetSessionTimeoutInMs : 60_000;
+          if (System.currentTimeMillis() - start > time) {
+            logger.warn(
+                "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
+                (System.currentTimeMillis() - start) / 1000,
+                host,
+                port,
+                user,
+                password);
+            logger.warn(
+                "current occupied size {}, queue size {}, considered size {} ",
+                occupied.size(),
+                queue.size(),
+                size);
+            if (System.currentTimeMillis() - start > waitToGetSessionTimeoutInMs) {
+              throw new IoTDBConnectionException(
+                  String.format("timeout to get a connection from %s:%s", host, port));
             }
           }
-          throw e;
+        } catch (InterruptedException e) {
+          logger.error("the SessionPool is damaged", e);
+          Thread.currentThread().interrupt();
         }
-      } else {
-        while (session == null) {
-          synchronized (this) {
-            if (closed) {
-              throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
-            }
-            // we have to wait for someone returns a session.
-            try {
-              if (logger.isDebugEnabled()) {
-                logger.debug(
-                    "no more sessions can be created, wait... queue.size={}", queue.size());
-              }
-              this.wait(1000);
-              long time = timeout < 60_000 ? timeout : 60_000;
-              if (System.currentTimeMillis() - start > time) {
-                logger.warn(
-                    "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
-                    (System.currentTimeMillis() - start) / 1000,
-                    host,
-                    port,
-                    user,
-                    password);
-                logger.warn(
-                    "current occupied size {}, queue size {}, considered size {} ",
-                    occupied.size(),
-                    queue.size(),
-                    size);
-                if (System.currentTimeMillis() - start > timeout) {
-                  throw new IoTDBConnectionException(
-                      String.format("timeout to get a connection from %s:%s", host, port));
-                }
-              }
-            } catch (InterruptedException e) {
-              logger.error("the SessionPool is damaged", e);
-              Thread.currentThread().interrupt();
-            }
-            session = queue.poll();
+
+        session = queue.poll();
+
+        if (closed) {
+          throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
+        }
+      }
+    }
+
+    if (shouldCreate) {
+      // create a new one.
+      if (logger.isDebugEnabled()) {
+        logger.debug("Create a new Session {}, {}, {}, {}", host, port, user, password);
+      }
+      session = new Session(host, port, user, password, fetchSize, zoneId, enableCacheLeader);
+      try {
+        session.open(enableCompression, connectionTimeoutInMs);
+        // avoid someone has called close() the session pool
+        synchronized (this) {
+          if (closed) {
+            // have to release the connection...
+            session.close();
+            throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
+          }
+        }
+      } catch (IoTDBConnectionException e) {
+        // if exception, we will throw the exception.
+        // Meanwhile, we have to set size--
+        synchronized (this) {
+          size--;
+          // we do not need to notifyAll as any waited thread can continue to work after waked up.
+          this.notify();
+          if (logger.isDebugEnabled()) {
+            logger.debug("open session failed, reduce the count and notify others...");
           }
         }
-        return session;
+        throw e;
       }
     }
+
+    return session;
   }
 
   public int currentAvailableSize() {
@@ -325,7 +339,7 @@ public class SessionPool {
     try {
       wrapper.sessionDataSet.closeOperationHandle();
     } catch (IoTDBConnectionException | StatementExecutionException e) {
-      removeSession();
+      tryConstructNewSession();
       putback = false;
     } finally {
       Session session = occupied.remove(wrapper.session);
@@ -336,13 +350,29 @@ public class SessionPool {
   }
 
   @SuppressWarnings({"squid:S2446"})
-  private synchronized void removeSession() {
-    logger.warn("Remove a broken Session {}, {}, {}", host, port, user);
-    size--;
-    // we do not need to notifyAll as any waited thread can continue to work after waked up.
-    this.notify();
-    if (logger.isDebugEnabled()) {
-      logger.debug("remove a broken session and notify others..., queue.size = {}", queue.size());
+  private void tryConstructNewSession() {
+    Session session = new Session(host, port, user, password, fetchSize, zoneId, enableCacheLeader);
+    try {
+      session.open(enableCompression, connectionTimeoutInMs);
+      // avoid someone has called close() the session pool
+      synchronized (this) {
+        if (closed) {
+          // have to release the connection...
+          session.close();
+          throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
+        }
+        queue.push(session);
+        this.notify();
+      }
+    } catch (IoTDBConnectionException e) {
+      synchronized (this) {
+        size--;
+        // we do not need to notifyAll as any waited thread can continue to work after waked up.
+        this.notify();
+        if (logger.isDebugEnabled()) {
+          logger.debug("open session failed, reduce the count and notify others...");
+        }
+      }
     }
   }
 
@@ -360,7 +390,7 @@ public class SessionPool {
   private void cleanSessionAndMayThrowConnectionException(
       Session session, int times, IoTDBConnectionException e) throws IoTDBConnectionException {
     closeSession(session);
-    removeSession();
+    tryConstructNewSession();
     if (times == FINAL_RETRY) {
       throw new IoTDBConnectionException(
           String.format(
@@ -1190,8 +1220,8 @@ public class SessionPool {
     return zoneId;
   }
 
-  public long getTimeout() {
-    return timeout;
+  public long getWaitToGetSessionTimeoutInMs() {
+    return waitToGetSessionTimeoutInMs;
   }
 
   public boolean isEnableCompression() {
@@ -1202,17 +1232,23 @@ public class SessionPool {
     return enableCacheLeader;
   }
 
+  public int getConnectionTimeoutInMs() {
+    return connectionTimeoutInMs;
+  }
+
   public static class Builder {
+
     private String host = Config.DEFAULT_HOST;
     private int port = Config.DEFAULT_PORT;
     private int maxSize = Config.DEFAULT_SESSION_POOL_MAX_SIZE;
     private String user = Config.DEFAULT_USER;
     private String password = Config.DEFAULT_PASSWORD;
     private int fetchSize = Config.DEFAULT_FETCH_SIZE;
-    private long timeout = 60_000;
+    private long waitToGetSessionTimeoutInMs = 60_000;
     private boolean enableCompression = false;
     private ZoneId zoneId = null;
     private boolean enableCacheLeader = Config.DEFAULT_CACHE_LEADER_MODE;
+    private int connectionTimeoutInMs = Config.DEFAULT_CONNECTION_TIMEOUT_MS;
 
     public Builder host(String host) {
       this.host = host;
@@ -1249,8 +1285,8 @@ public class SessionPool {
       return this;
     }
 
-    public Builder timeout(long timeout) {
-      this.timeout = timeout;
+    public Builder waitToGetSessionTimeoutInMs(long waitToGetSessionTimeoutInMs) {
+      this.waitToGetSessionTimeoutInMs = waitToGetSessionTimeoutInMs;
       return this;
     }
 
@@ -1264,6 +1300,11 @@ public class SessionPool {
       return this;
     }
 
+    public Builder connectionTimeoutInMs(int connectionTimeoutInMs) {
+      this.connectionTimeoutInMs = connectionTimeoutInMs;
+      return this;
+    }
+
     public SessionPool build() {
       return new SessionPool(
           host,
@@ -1272,10 +1313,11 @@ public class SessionPool {
           password,
           maxSize,
           fetchSize,
-          timeout,
+          waitToGetSessionTimeoutInMs,
           enableCompression,
           zoneId,
-          enableCacheLeader);
+          enableCacheLeader,
+          connectionTimeoutInMs);
     }
   }
 }
diff --git a/session/src/test/java/org/apache/iotdb/session/SessionTest.java b/session/src/test/java/org/apache/iotdb/session/SessionTest.java
index ad06d3d..144910f 100644
--- a/session/src/test/java/org/apache/iotdb/session/SessionTest.java
+++ b/session/src/test/java/org/apache/iotdb/session/SessionTest.java
@@ -200,9 +200,9 @@ public class SessionTest {
   @Test
   public void setTimeout() throws StatementExecutionException {
     session = new Session("127.0.0.1", 6667, "root", "root", 10000, 20000);
-    Assert.assertEquals(20000, session.getTimeout());
-    session.setTimeout(60000);
-    Assert.assertEquals(60000, session.getTimeout());
+    Assert.assertEquals(20000, session.getQueryTimeout());
+    session.setQueryTimeout(60000);
+    Assert.assertEquals(60000, session.getQueryTimeout());
   }
 
   @Test
diff --git a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
index 9116070..dc39d47 100644
--- a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
+++ b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Config;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import org.junit.After;
@@ -228,7 +229,18 @@ public class SessionPoolTest {
   @Test
   public void tryIfTheServerIsRestart() {
     SessionPool pool =
-        new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000, false, null, false);
+        new SessionPool(
+            "127.0.0.1",
+            6667,
+            "root",
+            "root",
+            3,
+            1,
+            6000,
+            false,
+            null,
+            false,
+            Config.DEFAULT_CONNECTION_TIMEOUT_MS);
     write10Data(pool, true);
     SessionDataSetWrapper wrapper = null;
     try {
@@ -244,7 +256,19 @@ public class SessionPoolTest {
       EnvironmentUtils.stopDaemon();
 
       EnvironmentUtils.reactiveDaemon();
-      pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000, false, null, false);
+      pool =
+          new SessionPool(
+              "127.0.0.1",
+              6667,
+              "root",
+              "root",
+              3,
+              1,
+              6000,
+              false,
+              null,
+              false,
+              Config.DEFAULT_CONNECTION_TIMEOUT_MS);
       correctQuery(pool);
       pool.close();
       return;
@@ -264,7 +288,19 @@ public class SessionPoolTest {
         pool.close();
         EnvironmentUtils.stopDaemon();
         EnvironmentUtils.reactiveDaemon();
-        pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000, false, null, false);
+        pool =
+            new SessionPool(
+                "127.0.0.1",
+                6667,
+                "root",
+                "root",
+                3,
+                1,
+                6000,
+                false,
+                null,
+                false,
+                Config.DEFAULT_CONNECTION_TIMEOUT_MS);
         correctQuery(pool);
         pool.close();
       } catch (StatementExecutionException es) {
@@ -283,10 +319,21 @@ public class SessionPoolTest {
   @Test
   public void tryIfTheServerIsRestartButDataIsGotten() {
     SessionPool pool =
-        new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 60000, false, null, false);
+        new SessionPool(
+            "127.0.0.1",
+            6667,
+            "root",
+            "root",
+            3,
+            1,
+            60000,
+            false,
+            null,
+            false,
+            Config.DEFAULT_CONNECTION_TIMEOUT_MS);
     write10Data(pool, true);
     assertEquals(1, pool.currentAvailableSize());
-    SessionDataSetWrapper wrapper = null;
+    SessionDataSetWrapper wrapper;
     try {
       wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time > 1");
       // user does not know what happens.
@@ -309,12 +356,35 @@ public class SessionPoolTest {
   @Test
   public void restart() {
     SessionPool pool =
-        new SessionPool("127.0.0.1", 6667, "root", "root", 1, 1, 1000, false, null, false);
+        new SessionPool(
+            "127.0.0.1",
+            6667,
+            "root",
+            "root",
+            1,
+            1,
+            1000,
+            false,
+            null,
+            false,
+            Config.DEFAULT_CONNECTION_TIMEOUT_MS);
     write10Data(pool, true);
     // stop the server.
     pool.close();
     EnvironmentUtils.stopDaemon();
-    pool = new SessionPool("127.0.0.1", 6667, "root", "root", 1, 1, 1000, false, null, false);
+    pool =
+        new SessionPool(
+            "127.0.0.1",
+            6667,
+            "root",
+            "root",
+            1,
+            1,
+            1000,
+            false,
+            null,
+            false,
+            Config.DEFAULT_CONNECTION_TIMEOUT_MS);
     // all this ten data will fail.
     write10Data(pool, false);
     // restart the server
@@ -344,7 +414,18 @@ public class SessionPoolTest {
   @Test
   public void testClose() {
     SessionPool pool =
-        new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 60000, false, null, false);
+        new SessionPool(
+            "127.0.0.1",
+            6667,
+            "root",
+            "root",
+            3,
+            1,
+            60000,
+            false,
+            null,
+            false,
+            Config.DEFAULT_CONNECTION_TIMEOUT_MS);
     pool.close();
     try {
       pool.insertRecord(
@@ -373,10 +454,11 @@ public class SessionPoolTest {
             .user("abc")
             .password("123")
             .fetchSize(1)
-            .timeout(2)
+            .waitToGetSessionTimeoutInMs(2)
             .enableCacheLeader(true)
             .enableCompression(true)
             .zoneId(ZoneOffset.UTC)
+            .connectionTimeoutInMs(3)
             .build();
 
     assertEquals("localhost", pool.getHost());
@@ -385,9 +467,10 @@ public class SessionPoolTest {
     assertEquals("123", pool.getPassword());
     assertEquals(10, pool.getMaxSize());
     assertEquals(1, pool.getFetchSize());
-    assertEquals(2, pool.getTimeout());
+    assertEquals(2, pool.getWaitToGetSessionTimeoutInMs());
     assertTrue(pool.isEnableCacheLeader());
     assertTrue(pool.isEnableCompression());
+    assertEquals(3, pool.getConnectionTimeoutInMs());
     assertEquals(ZoneOffset.UTC, pool.getZoneId());
   }
 }