You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/08/24 03:42:26 UTC

[iotdb] 04/04: add parameter connectionTimeoutInMs for SessionPool

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch session-pool-optimization-0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit eb4aa5393a8071374bdbffbd3d8bbc91463955ad
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Aug 24 11:41:43 2021 +0800

    add parameter connectionTimeoutInMs for SessionPool
---
 .../java/org/apache/iotdb/session/Session.java     |  2 +-
 .../org/apache/iotdb/session/pool/SessionPool.java | 59 ++++++++------
 .../apache/iotdb/session/pool/SessionPoolTest.java | 95 ++++++++++++++++++++--
 3 files changed, 124 insertions(+), 32 deletions(-)

diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 20079c9..06b3536 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -314,7 +314,7 @@ public class Session {
     open(enableRPCCompression, Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
-  private synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
+  public synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
       throws IoTDBConnectionException {
     if (!isClosed) {
       return;
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index d8b2991..5fa2c7e 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -64,22 +64,27 @@ public class SessionPool {
   private static final Logger logger = LoggerFactory.getLogger(SessionPool.class);
   public static final String SESSION_POOL_IS_CLOSED = "Session pool is closed";
   public static final String CLOSE_THE_SESSION_FAILED = "close the session failed.";
-  private static int RETRY = 3;
-  private ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
+
+  private static final int RETRY = 3;
+  private static final int FINAL_RETRY = RETRY - 1;
+
+  private final ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
   // for session whose resultSet is not released.
-  private ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
+  private final ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
+
   private int size = 0;
   private int maxSize = 0;
-  private String ip;
-  private int port;
-  private String user;
-  private String password;
-  private int fetchSize;
-  private long timeout; // ms
-  private static int FINAL_RETRY = RETRY - 1;
-  private boolean enableCompression;
-  private boolean enableCacheLeader;
-  private ZoneId zoneId;
+
+  private final String ip;
+  private final int port;
+  private final String user;
+  private final String password;
+  private final int fetchSize;
+  private final long waitToGetSessionTimeoutInMs;
+  private final int connectionTimeoutInMs;
+  private final boolean enableCompression;
+  private final boolean enableCacheLeader;
+  private final ZoneId zoneId;
 
   private boolean closed; // whether the queue is closed.
 
@@ -94,7 +99,8 @@ public class SessionPool {
         60_000,
         false,
         null,
-        Config.DEFAULT_CACHE_LEADER_MODE);
+        Config.DEFAULT_CACHE_LEADER_MODE,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
   public SessionPool(
@@ -109,7 +115,8 @@ public class SessionPool {
         60_000,
         enableCompression,
         null,
-        Config.DEFAULT_CACHE_LEADER_MODE);
+        Config.DEFAULT_CACHE_LEADER_MODE,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
   public SessionPool(
@@ -130,7 +137,8 @@ public class SessionPool {
         60_000,
         enableCompression,
         null,
-        enableCacheLeader);
+        enableCacheLeader,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
   public SessionPool(
@@ -145,7 +153,8 @@ public class SessionPool {
         60_000,
         false,
         zoneId,
-        Config.DEFAULT_CACHE_LEADER_MODE);
+        Config.DEFAULT_CACHE_LEADER_MODE,
+        Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
   @SuppressWarnings("squid:S107")
@@ -156,20 +165,22 @@ public class SessionPool {
       String password,
       int maxSize,
       int fetchSize,
-      long timeout,
+      long waitToGetSessionTimeoutInMs,
       boolean enableCompression,
       ZoneId zoneId,
-      boolean enableCacheLeader) {
+      boolean enableCacheLeader,
+      int connectionTimeoutInMs) {
     this.maxSize = maxSize;
     this.ip = ip;
     this.port = port;
     this.user = user;
     this.password = password;
     this.fetchSize = fetchSize;
-    this.timeout = timeout;
+    this.waitToGetSessionTimeoutInMs = waitToGetSessionTimeoutInMs;
     this.enableCompression = enableCompression;
     this.zoneId = zoneId;
     this.enableCacheLeader = enableCacheLeader;
+    this.connectionTimeoutInMs = connectionTimeoutInMs;
   }
 
   // if this method throws an exception, either the server is broken, or the ip/port/user/password
@@ -204,7 +215,7 @@ public class SessionPool {
             logger.debug("no more sessions can be created, wait... queue.size={}", queue.size());
           }
           this.wait(1000);
-          long time = timeout < 60_000 ? timeout : 60_000;
+          long time = waitToGetSessionTimeoutInMs < 60_000 ? waitToGetSessionTimeoutInMs : 60_000;
           if (System.currentTimeMillis() - start > time) {
             logger.warn(
                 "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
@@ -218,7 +229,7 @@ public class SessionPool {
                 occupied.size(),
                 queue.size(),
                 size);
-            if (System.currentTimeMillis() - start > timeout) {
+            if (System.currentTimeMillis() - start > waitToGetSessionTimeoutInMs) {
               throw new IoTDBConnectionException(
                   String.format("timeout to get a connection from %s:%s", ip, port));
             }
@@ -243,7 +254,7 @@ public class SessionPool {
       }
       session = new Session(ip, port, user, password, fetchSize, zoneId, enableCacheLeader);
       try {
-        session.open(enableCompression);
+        session.open(enableCompression, connectionTimeoutInMs);
         // avoid someone has called close() the session pool
         synchronized (this) {
           if (closed) {
@@ -339,7 +350,7 @@ public class SessionPool {
   private void tryConstructNewSession() {
     Session session = new Session(ip, port, user, password, fetchSize, zoneId, enableCacheLeader);
     try {
-      session.open(enableCompression);
+      session.open(enableCompression, connectionTimeoutInMs);
       // avoid someone has called close() the session pool
       synchronized (this) {
         if (closed) {
diff --git a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
index 6b7d68f..18e182e 100644
--- a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
+++ b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Config;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import org.junit.After;
@@ -227,7 +228,18 @@ public class SessionPoolTest {
   @Test
   public void tryIfTheServerIsRestart() {
     SessionPool pool =
-        new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000, false, null, false);
+        new SessionPool(
+            "127.0.0.1",
+            6667,
+            "root",
+            "root",
+            3,
+            1,
+            6000,
+            false,
+            null,
+            false,
+            Config.DEFAULT_CONNECTION_TIMEOUT_MS);
     write10Data(pool, true);
     SessionDataSetWrapper wrapper = null;
     try {
@@ -243,7 +255,19 @@ public class SessionPoolTest {
       EnvironmentUtils.stopDaemon();
 
       EnvironmentUtils.reactiveDaemon();
-      pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000, false, null, false);
+      pool =
+          new SessionPool(
+              "127.0.0.1",
+              6667,
+              "root",
+              "root",
+              3,
+              1,
+              6000,
+              false,
+              null,
+              false,
+              Config.DEFAULT_CONNECTION_TIMEOUT_MS);
       correctQuery(pool);
       pool.close();
       return;
@@ -263,7 +287,19 @@ public class SessionPoolTest {
         pool.close();
         EnvironmentUtils.stopDaemon();
         EnvironmentUtils.reactiveDaemon();
-        pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000, false, null, false);
+        pool =
+            new SessionPool(
+                "127.0.0.1",
+                6667,
+                "root",
+                "root",
+                3,
+                1,
+                6000,
+                false,
+                null,
+                false,
+                Config.DEFAULT_CONNECTION_TIMEOUT_MS);
         correctQuery(pool);
         pool.close();
       } catch (StatementExecutionException es) {
@@ -282,7 +318,18 @@ public class SessionPoolTest {
   @Test
   public void tryIfTheServerIsRestartButDataIsGotten() {
     SessionPool pool =
-        new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 60000, false, null, false);
+        new SessionPool(
+            "127.0.0.1",
+            6667,
+            "root",
+            "root",
+            3,
+            1,
+            60000,
+            false,
+            null,
+            false,
+            Config.DEFAULT_CONNECTION_TIMEOUT_MS);
     write10Data(pool, true);
     assertEquals(1, pool.currentAvailableSize());
     SessionDataSetWrapper wrapper = null;
@@ -308,12 +355,35 @@ public class SessionPoolTest {
   @Test
   public void restart() {
     SessionPool pool =
-        new SessionPool("127.0.0.1", 6667, "root", "root", 1, 1, 1000, false, null, false);
+        new SessionPool(
+            "127.0.0.1",
+            6667,
+            "root",
+            "root",
+            1,
+            1,
+            1000,
+            false,
+            null,
+            false,
+            Config.DEFAULT_CONNECTION_TIMEOUT_MS);
     write10Data(pool, true);
     // stop the server.
     pool.close();
     EnvironmentUtils.stopDaemon();
-    pool = new SessionPool("127.0.0.1", 6667, "root", "root", 1, 1, 1000, false, null, false);
+    pool =
+        new SessionPool(
+            "127.0.0.1",
+            6667,
+            "root",
+            "root",
+            1,
+            1,
+            1000,
+            false,
+            null,
+            false,
+            Config.DEFAULT_CONNECTION_TIMEOUT_MS);
     // all this ten data will fail.
     write10Data(pool, false);
     // restart the server
@@ -343,7 +413,18 @@ public class SessionPoolTest {
   @Test
   public void testClose() {
     SessionPool pool =
-        new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 60000, false, null, false);
+        new SessionPool(
+            "127.0.0.1",
+            6667,
+            "root",
+            "root",
+            3,
+            1,
+            60000,
+            false,
+            null,
+            false,
+            Config.DEFAULT_CONNECTION_TIMEOUT_MS);
     pool.close();
     try {
       pool.insertRecord(