You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/08/24 08:19:44 UTC

[iotdb] 04/05: add parameter connectionTimeoutInMs for SessionPool

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch session-pool-optimization
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 815b4c23c752991ec2e9c7daddf0ac56d0a489c7
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Aug 24 16:17:37 2021 +0800

    add parameter connectionTimeoutInMs for SessionPool
---
 .../java/org/apache/iotdb/session/Session.java     |   2 +-
 .../org/apache/iotdb/session/pool/SessionPool.java |  85 ++++++++++-------
 .../apache/iotdb/session/pool/SessionPoolTest.java | 103 +++++++++++++++++++--
 3 files changed, 148 insertions(+), 42 deletions(-)

diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 839d6d6..b28afcf 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -317,7 +317,7 @@ public class Session {
     open(enableRPCCompression, Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
-  private synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
+  public synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
       throws IoTDBConnectionException {
     if (!isClosed) {
       return;
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index e4b712d..1beb0c5 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -64,22 +64,27 @@ public class SessionPool {
   private static final Logger logger = LoggerFactory.getLogger(SessionPool.class);
   public static final String SESSION_POOL_IS_CLOSED = "Session pool is closed";
   public static final String CLOSE_THE_SESSION_FAILED = "close the session failed.";
-  private static int RETRY = 3;
-  private ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
+
+  private static final int RETRY = 3;
+  private static final int FINAL_RETRY = RETRY - 1;
+
+  private final ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
   // for session whose resultSet is not released.
-  private ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
+  private final ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
+
   private int size = 0;
   private int maxSize = 0;
-  private String host;
-  private int port;
-  private String user;
-  private String password;
-  private int fetchSize;
-  private long timeout; // ms
-  private static int FINAL_RETRY = RETRY - 1;
-  private boolean enableCompression;
-  private boolean enableCacheLeader;
-  private ZoneId zoneId;
+
+  private final String host;
+  private final int port;
+  private final String user;
+  private final String password;
+  private final int fetchSize;
+  private final long waitToGetSessionTimeoutInMs;
+  private final int connectionTimeoutInMs;
+  private final boolean enableCompression;
+  private final boolean enableCacheLeader;
+  private final ZoneId zoneId;
 
   private boolean closed; // whether the queue is closed.
 
@@ -94,7 +99,8 @@ public class SessionPool {
         60_000,
         false,
         null,
-        Config.DEFAULT_CACHE_LEADER_MODE);
+        Config.DEFAULT_CACHE_LEADER_MODE,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
   public SessionPool(
@@ -109,7 +115,8 @@ public class SessionPool {
         60_000,
         enableCompression,
         null,
-        Config.DEFAULT_CACHE_LEADER_MODE);
+        Config.DEFAULT_CACHE_LEADER_MODE,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
   public SessionPool(
@@ -130,7 +137,8 @@ public class SessionPool {
         60_000,
         enableCompression,
         null,
-        enableCacheLeader);
+        enableCacheLeader,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
   public SessionPool(
@@ -145,7 +153,8 @@ public class SessionPool {
         60_000,
         false,
         zoneId,
-        Config.DEFAULT_CACHE_LEADER_MODE);
+        Config.DEFAULT_CACHE_LEADER_MODE,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
   @SuppressWarnings("squid:S107")
@@ -156,20 +165,22 @@ public class SessionPool {
       String password,
       int maxSize,
       int fetchSize,
-      long timeout,
+      long waitToGetSessionTimeoutInMs,
       boolean enableCompression,
       ZoneId zoneId,
-      boolean enableCacheLeader) {
+      boolean enableCacheLeader,
+      int connectionTimeoutInMs) {
     this.maxSize = maxSize;
     this.host = host;
     this.port = port;
     this.user = user;
     this.password = password;
     this.fetchSize = fetchSize;
-    this.timeout = timeout;
+    this.waitToGetSessionTimeoutInMs = waitToGetSessionTimeoutInMs;
     this.enableCompression = enableCompression;
     this.zoneId = zoneId;
     this.enableCacheLeader = enableCacheLeader;
+    this.connectionTimeoutInMs = connectionTimeoutInMs;
   }
 
   // if this method throws an exception, either the server is broken, or the ip/port/user/password
@@ -204,7 +215,7 @@ public class SessionPool {
             logger.debug("no more sessions can be created, wait... queue.size={}", queue.size());
           }
           this.wait(1000);
-          long time = timeout < 60_000 ? timeout : 60_000;
+          long time = waitToGetSessionTimeoutInMs < 60_000 ? waitToGetSessionTimeoutInMs : 60_000;
           if (System.currentTimeMillis() - start > time) {
             logger.warn(
                 "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
@@ -218,7 +229,7 @@ public class SessionPool {
                 occupied.size(),
                 queue.size(),
                 size);
-            if (System.currentTimeMillis() - start > timeout) {
+            if (System.currentTimeMillis() - start > waitToGetSessionTimeoutInMs) {
               throw new IoTDBConnectionException(
                   String.format("timeout to get a connection from %s:%s", host, port));
             }
@@ -243,7 +254,7 @@ public class SessionPool {
       }
       session = new Session(host, port, user, password, fetchSize, zoneId, enableCacheLeader);
       try {
-        session.open(enableCompression);
+        session.open(enableCompression, connectionTimeoutInMs);
         // avoid someone has called close() the session pool
         synchronized (this) {
           if (closed) {
@@ -339,7 +350,7 @@ public class SessionPool {
   private void tryConstructNewSession() {
     Session session = new Session(host, port, user, password, fetchSize, zoneId, enableCacheLeader);
     try {
-      session.open(enableCompression);
+      session.open(enableCompression, connectionTimeoutInMs);
       // avoid someone has called close() the session pool
       synchronized (this) {
         if (closed) {
@@ -1206,8 +1217,8 @@ public class SessionPool {
     return zoneId;
   }
 
-  public long getTimeout() {
-    return timeout;
+  public long getWaitToGetSessionTimeoutInMs() {
+    return waitToGetSessionTimeoutInMs;
   }
 
   public boolean isEnableCompression() {
@@ -1218,17 +1229,23 @@ public class SessionPool {
     return enableCacheLeader;
   }
 
+  public int getConnectionTimeoutInMs() {
+    return connectionTimeoutInMs;
+  }
+
   public static class Builder {
+
     private String host = Config.DEFAULT_HOST;
     private int port = Config.DEFAULT_PORT;
     private int maxSize = Config.DEFAULT_SESSION_POOL_MAX_SIZE;
     private String user = Config.DEFAULT_USER;
     private String password = Config.DEFAULT_PASSWORD;
     private int fetchSize = Config.DEFAULT_FETCH_SIZE;
-    private long timeout = 60_000;
+    private long waitToGetSessionTimeoutInMs = 60_000;
     private boolean enableCompression = false;
     private ZoneId zoneId = null;
     private boolean enableCacheLeader = Config.DEFAULT_CACHE_LEADER_MODE;
+    private int connectionTimeoutInMs = Config.DEFAULT_CONNECTION_TIMEOUT_MS;
 
     public Builder host(String host) {
       this.host = host;
@@ -1265,8 +1282,8 @@ public class SessionPool {
       return this;
     }
 
-    public Builder timeout(long timeout) {
-      this.timeout = timeout;
+    public Builder waitToGetSessionTimeoutInMs(long waitToGetSessionTimeoutInMs) {
+      this.waitToGetSessionTimeoutInMs = waitToGetSessionTimeoutInMs;
       return this;
     }
 
@@ -1280,6 +1297,11 @@ public class SessionPool {
       return this;
     }
 
+    public Builder connectionTimeoutInMs(int connectionTimeoutInMs) {
+      this.connectionTimeoutInMs = connectionTimeoutInMs;
+      return this;
+    }
+
     public SessionPool build() {
       return new SessionPool(
           host,
@@ -1288,10 +1310,11 @@ public class SessionPool {
           password,
           maxSize,
           fetchSize,
-          timeout,
+          waitToGetSessionTimeoutInMs,
           enableCompression,
           zoneId,
-          enableCacheLeader);
+          enableCacheLeader,
+          connectionTimeoutInMs);
     }
   }
 }
diff --git a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
index 9116070..dc39d47 100644
--- a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
+++ b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Config;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import org.junit.After;
@@ -228,7 +229,18 @@ public class SessionPoolTest {
   @Test
   public void tryIfTheServerIsRestart() {
     SessionPool pool =
-        new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000, false, null, false);
+        new SessionPool(
+            "127.0.0.1",
+            6667,
+            "root",
+            "root",
+            3,
+            1,
+            6000,
+            false,
+            null,
+            false,
+            Config.DEFAULT_CONNECTION_TIMEOUT_MS);
     write10Data(pool, true);
     SessionDataSetWrapper wrapper = null;
     try {
@@ -244,7 +256,19 @@ public class SessionPoolTest {
       EnvironmentUtils.stopDaemon();
 
       EnvironmentUtils.reactiveDaemon();
-      pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000, false, null, false);
+      pool =
+          new SessionPool(
+              "127.0.0.1",
+              6667,
+              "root",
+              "root",
+              3,
+              1,
+              6000,
+              false,
+              null,
+              false,
+              Config.DEFAULT_CONNECTION_TIMEOUT_MS);
       correctQuery(pool);
       pool.close();
       return;
@@ -264,7 +288,19 @@ public class SessionPoolTest {
         pool.close();
         EnvironmentUtils.stopDaemon();
         EnvironmentUtils.reactiveDaemon();
-        pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000, false, null, false);
+        pool =
+            new SessionPool(
+                "127.0.0.1",
+                6667,
+                "root",
+                "root",
+                3,
+                1,
+                6000,
+                false,
+                null,
+                false,
+                Config.DEFAULT_CONNECTION_TIMEOUT_MS);
         correctQuery(pool);
         pool.close();
       } catch (StatementExecutionException es) {
@@ -283,10 +319,21 @@ public class SessionPoolTest {
   @Test
   public void tryIfTheServerIsRestartButDataIsGotten() {
     SessionPool pool =
-        new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 60000, false, null, false);
+        new SessionPool(
+            "127.0.0.1",
+            6667,
+            "root",
+            "root",
+            3,
+            1,
+            60000,
+            false,
+            null,
+            false,
+            Config.DEFAULT_CONNECTION_TIMEOUT_MS);
     write10Data(pool, true);
     assertEquals(1, pool.currentAvailableSize());
-    SessionDataSetWrapper wrapper = null;
+    SessionDataSetWrapper wrapper;
     try {
       wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time > 1");
       // user does not know what happens.
@@ -309,12 +356,35 @@ public class SessionPoolTest {
   @Test
   public void restart() {
     SessionPool pool =
-        new SessionPool("127.0.0.1", 6667, "root", "root", 1, 1, 1000, false, null, false);
+        new SessionPool(
+            "127.0.0.1",
+            6667,
+            "root",
+            "root",
+            1,
+            1,
+            1000,
+            false,
+            null,
+            false,
+            Config.DEFAULT_CONNECTION_TIMEOUT_MS);
     write10Data(pool, true);
     // stop the server.
     pool.close();
     EnvironmentUtils.stopDaemon();
-    pool = new SessionPool("127.0.0.1", 6667, "root", "root", 1, 1, 1000, false, null, false);
+    pool =
+        new SessionPool(
+            "127.0.0.1",
+            6667,
+            "root",
+            "root",
+            1,
+            1,
+            1000,
+            false,
+            null,
+            false,
+            Config.DEFAULT_CONNECTION_TIMEOUT_MS);
     // all this ten data will fail.
     write10Data(pool, false);
     // restart the server
@@ -344,7 +414,18 @@ public class SessionPoolTest {
   @Test
   public void testClose() {
     SessionPool pool =
-        new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 60000, false, null, false);
+        new SessionPool(
+            "127.0.0.1",
+            6667,
+            "root",
+            "root",
+            3,
+            1,
+            60000,
+            false,
+            null,
+            false,
+            Config.DEFAULT_CONNECTION_TIMEOUT_MS);
     pool.close();
     try {
       pool.insertRecord(
@@ -373,10 +454,11 @@ public class SessionPoolTest {
             .user("abc")
             .password("123")
             .fetchSize(1)
-            .timeout(2)
+            .waitToGetSessionTimeoutInMs(2)
             .enableCacheLeader(true)
             .enableCompression(true)
             .zoneId(ZoneOffset.UTC)
+            .connectionTimeoutInMs(3)
             .build();
 
     assertEquals("localhost", pool.getHost());
@@ -385,9 +467,10 @@ public class SessionPoolTest {
     assertEquals("123", pool.getPassword());
     assertEquals(10, pool.getMaxSize());
     assertEquals(1, pool.getFetchSize());
-    assertEquals(2, pool.getTimeout());
+    assertEquals(2, pool.getWaitToGetSessionTimeoutInMs());
     assertTrue(pool.isEnableCacheLeader());
     assertTrue(pool.isEnableCompression());
+    assertEquals(3, pool.getConnectionTimeoutInMs());
     assertEquals(ZoneOffset.UTC, pool.getZoneId());
   }
 }