You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/08/24 03:42:26 UTC
[iotdb] 04/04: add parameter connectionTimeoutInMs for SessionPool
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch session-pool-optimization-0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit eb4aa5393a8071374bdbffbd3d8bbc91463955ad
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Aug 24 11:41:43 2021 +0800
add parameter connectionTimeoutInMs for SessionPool
---
.../java/org/apache/iotdb/session/Session.java | 2 +-
.../org/apache/iotdb/session/pool/SessionPool.java | 59 ++++++++------
.../apache/iotdb/session/pool/SessionPoolTest.java | 95 ++++++++++++++++++++--
3 files changed, 124 insertions(+), 32 deletions(-)
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 20079c9..06b3536 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -314,7 +314,7 @@ public class Session {
open(enableRPCCompression, Config.DEFAULT_CONNECTION_TIMEOUT_MS);
}
- private synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
+ public synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
throws IoTDBConnectionException {
if (!isClosed) {
return;
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index d8b2991..5fa2c7e 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -64,22 +64,27 @@ public class SessionPool {
private static final Logger logger = LoggerFactory.getLogger(SessionPool.class);
public static final String SESSION_POOL_IS_CLOSED = "Session pool is closed";
public static final String CLOSE_THE_SESSION_FAILED = "close the session failed.";
- private static int RETRY = 3;
- private ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
+
+ private static final int RETRY = 3;
+ private static final int FINAL_RETRY = RETRY - 1;
+
+ private final ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
// for session whose resultSet is not released.
- private ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
+
private int size = 0;
private int maxSize = 0;
- private String ip;
- private int port;
- private String user;
- private String password;
- private int fetchSize;
- private long timeout; // ms
- private static int FINAL_RETRY = RETRY - 1;
- private boolean enableCompression;
- private boolean enableCacheLeader;
- private ZoneId zoneId;
+
+ private final String ip;
+ private final int port;
+ private final String user;
+ private final String password;
+ private final int fetchSize;
+ private final long waitToGetSessionTimeoutInMs;
+ private final int connectionTimeoutInMs;
+ private final boolean enableCompression;
+ private final boolean enableCacheLeader;
+ private final ZoneId zoneId;
private boolean closed; // whether the queue is closed.
@@ -94,7 +99,8 @@ public class SessionPool {
60_000,
false,
null,
- Config.DEFAULT_CACHE_LEADER_MODE);
+ Config.DEFAULT_CACHE_LEADER_MODE,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
}
public SessionPool(
@@ -109,7 +115,8 @@ public class SessionPool {
60_000,
enableCompression,
null,
- Config.DEFAULT_CACHE_LEADER_MODE);
+ Config.DEFAULT_CACHE_LEADER_MODE,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
}
public SessionPool(
@@ -130,7 +137,8 @@ public class SessionPool {
60_000,
enableCompression,
null,
- enableCacheLeader);
+ enableCacheLeader,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
}
public SessionPool(
@@ -145,7 +153,8 @@ public class SessionPool {
60_000,
false,
zoneId,
- Config.DEFAULT_CACHE_LEADER_MODE);
+ Config.DEFAULT_CACHE_LEADER_MODE,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
}
@SuppressWarnings("squid:S107")
@@ -156,20 +165,22 @@ public class SessionPool {
String password,
int maxSize,
int fetchSize,
- long timeout,
+ long waitToGetSessionTimeoutInMs,
boolean enableCompression,
ZoneId zoneId,
- boolean enableCacheLeader) {
+ boolean enableCacheLeader,
+ int connectionTimeoutInMs) {
this.maxSize = maxSize;
this.ip = ip;
this.port = port;
this.user = user;
this.password = password;
this.fetchSize = fetchSize;
- this.timeout = timeout;
+ this.waitToGetSessionTimeoutInMs = waitToGetSessionTimeoutInMs;
this.enableCompression = enableCompression;
this.zoneId = zoneId;
this.enableCacheLeader = enableCacheLeader;
+ this.connectionTimeoutInMs = connectionTimeoutInMs;
}
// if this method throws an exception, either the server is broken, or the ip/port/user/password
@@ -204,7 +215,7 @@ public class SessionPool {
logger.debug("no more sessions can be created, wait... queue.size={}", queue.size());
}
this.wait(1000);
- long time = timeout < 60_000 ? timeout : 60_000;
+ long time = waitToGetSessionTimeoutInMs < 60_000 ? waitToGetSessionTimeoutInMs : 60_000;
if (System.currentTimeMillis() - start > time) {
logger.warn(
"the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
@@ -218,7 +229,7 @@ public class SessionPool {
occupied.size(),
queue.size(),
size);
- if (System.currentTimeMillis() - start > timeout) {
+ if (System.currentTimeMillis() - start > waitToGetSessionTimeoutInMs) {
throw new IoTDBConnectionException(
String.format("timeout to get a connection from %s:%s", ip, port));
}
@@ -243,7 +254,7 @@ public class SessionPool {
}
session = new Session(ip, port, user, password, fetchSize, zoneId, enableCacheLeader);
try {
- session.open(enableCompression);
+ session.open(enableCompression, connectionTimeoutInMs);
// avoid someone has called close() the session pool
synchronized (this) {
if (closed) {
@@ -339,7 +350,7 @@ public class SessionPool {
private void tryConstructNewSession() {
Session session = new Session(ip, port, user, password, fetchSize, zoneId, enableCacheLeader);
try {
- session.open(enableCompression);
+ session.open(enableCompression, connectionTimeoutInMs);
// avoid someone has called close() the session pool
synchronized (this) {
if (closed) {
diff --git a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
index 6b7d68f..18e182e 100644
--- a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
+++ b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Config;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.junit.After;
@@ -227,7 +228,18 @@ public class SessionPoolTest {
@Test
public void tryIfTheServerIsRestart() {
SessionPool pool =
- new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000, false, null, false);
+ new SessionPool(
+ "127.0.0.1",
+ 6667,
+ "root",
+ "root",
+ 3,
+ 1,
+ 6000,
+ false,
+ null,
+ false,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
write10Data(pool, true);
SessionDataSetWrapper wrapper = null;
try {
@@ -243,7 +255,19 @@ public class SessionPoolTest {
EnvironmentUtils.stopDaemon();
EnvironmentUtils.reactiveDaemon();
- pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000, false, null, false);
+ pool =
+ new SessionPool(
+ "127.0.0.1",
+ 6667,
+ "root",
+ "root",
+ 3,
+ 1,
+ 6000,
+ false,
+ null,
+ false,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
correctQuery(pool);
pool.close();
return;
@@ -263,7 +287,19 @@ public class SessionPoolTest {
pool.close();
EnvironmentUtils.stopDaemon();
EnvironmentUtils.reactiveDaemon();
- pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000, false, null, false);
+ pool =
+ new SessionPool(
+ "127.0.0.1",
+ 6667,
+ "root",
+ "root",
+ 3,
+ 1,
+ 6000,
+ false,
+ null,
+ false,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
correctQuery(pool);
pool.close();
} catch (StatementExecutionException es) {
@@ -282,7 +318,18 @@ public class SessionPoolTest {
@Test
public void tryIfTheServerIsRestartButDataIsGotten() {
SessionPool pool =
- new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 60000, false, null, false);
+ new SessionPool(
+ "127.0.0.1",
+ 6667,
+ "root",
+ "root",
+ 3,
+ 1,
+ 60000,
+ false,
+ null,
+ false,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
write10Data(pool, true);
assertEquals(1, pool.currentAvailableSize());
SessionDataSetWrapper wrapper = null;
@@ -308,12 +355,35 @@ public class SessionPoolTest {
@Test
public void restart() {
SessionPool pool =
- new SessionPool("127.0.0.1", 6667, "root", "root", 1, 1, 1000, false, null, false);
+ new SessionPool(
+ "127.0.0.1",
+ 6667,
+ "root",
+ "root",
+ 1,
+ 1,
+ 1000,
+ false,
+ null,
+ false,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
write10Data(pool, true);
// stop the server.
pool.close();
EnvironmentUtils.stopDaemon();
- pool = new SessionPool("127.0.0.1", 6667, "root", "root", 1, 1, 1000, false, null, false);
+ pool =
+ new SessionPool(
+ "127.0.0.1",
+ 6667,
+ "root",
+ "root",
+ 1,
+ 1,
+ 1000,
+ false,
+ null,
+ false,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
// all this ten data will fail.
write10Data(pool, false);
// restart the server
@@ -343,7 +413,18 @@ public class SessionPoolTest {
@Test
public void testClose() {
SessionPool pool =
- new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 60000, false, null, false);
+ new SessionPool(
+ "127.0.0.1",
+ 6667,
+ "root",
+ "root",
+ 3,
+ 1,
+ 60000,
+ false,
+ null,
+ false,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
pool.close();
try {
pool.insertRecord(