You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/08/24 08:19:44 UTC
[iotdb] 04/05: add parameter connectionTimeoutInMs for SessionPool
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch session-pool-optimization
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 815b4c23c752991ec2e9c7daddf0ac56d0a489c7
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Aug 24 16:17:37 2021 +0800
add parameter connectionTimeoutInMs for SessionPool
---
.../java/org/apache/iotdb/session/Session.java | 2 +-
.../org/apache/iotdb/session/pool/SessionPool.java | 85 ++++++++++-------
.../apache/iotdb/session/pool/SessionPoolTest.java | 103 +++++++++++++++++++--
3 files changed, 148 insertions(+), 42 deletions(-)
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 839d6d6..b28afcf 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -317,7 +317,7 @@ public class Session {
open(enableRPCCompression, Config.DEFAULT_CONNECTION_TIMEOUT_MS);
}
- private synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
+ public synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
throws IoTDBConnectionException {
if (!isClosed) {
return;
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index e4b712d..1beb0c5 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -64,22 +64,27 @@ public class SessionPool {
private static final Logger logger = LoggerFactory.getLogger(SessionPool.class);
public static final String SESSION_POOL_IS_CLOSED = "Session pool is closed";
public static final String CLOSE_THE_SESSION_FAILED = "close the session failed.";
- private static int RETRY = 3;
- private ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
+
+ private static final int RETRY = 3;
+ private static final int FINAL_RETRY = RETRY - 1;
+
+ private final ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
// for session whose resultSet is not released.
- private ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
+
private int size = 0;
private int maxSize = 0;
- private String host;
- private int port;
- private String user;
- private String password;
- private int fetchSize;
- private long timeout; // ms
- private static int FINAL_RETRY = RETRY - 1;
- private boolean enableCompression;
- private boolean enableCacheLeader;
- private ZoneId zoneId;
+
+ private final String host;
+ private final int port;
+ private final String user;
+ private final String password;
+ private final int fetchSize;
+ private final long waitToGetSessionTimeoutInMs;
+ private final int connectionTimeoutInMs;
+ private final boolean enableCompression;
+ private final boolean enableCacheLeader;
+ private final ZoneId zoneId;
private boolean closed; // whether the queue is closed.
@@ -94,7 +99,8 @@ public class SessionPool {
60_000,
false,
null,
- Config.DEFAULT_CACHE_LEADER_MODE);
+ Config.DEFAULT_CACHE_LEADER_MODE,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
}
public SessionPool(
@@ -109,7 +115,8 @@ public class SessionPool {
60_000,
enableCompression,
null,
- Config.DEFAULT_CACHE_LEADER_MODE);
+ Config.DEFAULT_CACHE_LEADER_MODE,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
}
public SessionPool(
@@ -130,7 +137,8 @@ public class SessionPool {
60_000,
enableCompression,
null,
- enableCacheLeader);
+ enableCacheLeader,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
}
public SessionPool(
@@ -145,7 +153,8 @@ public class SessionPool {
60_000,
false,
zoneId,
- Config.DEFAULT_CACHE_LEADER_MODE);
+ Config.DEFAULT_CACHE_LEADER_MODE,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
}
@SuppressWarnings("squid:S107")
@@ -156,20 +165,22 @@ public class SessionPool {
String password,
int maxSize,
int fetchSize,
- long timeout,
+ long waitToGetSessionTimeoutInMs,
boolean enableCompression,
ZoneId zoneId,
- boolean enableCacheLeader) {
+ boolean enableCacheLeader,
+ int connectionTimeoutInMs) {
this.maxSize = maxSize;
this.host = host;
this.port = port;
this.user = user;
this.password = password;
this.fetchSize = fetchSize;
- this.timeout = timeout;
+ this.waitToGetSessionTimeoutInMs = waitToGetSessionTimeoutInMs;
this.enableCompression = enableCompression;
this.zoneId = zoneId;
this.enableCacheLeader = enableCacheLeader;
+ this.connectionTimeoutInMs = connectionTimeoutInMs;
}
// if this method throws an exception, either the server is broken, or the ip/port/user/password
@@ -204,7 +215,7 @@ public class SessionPool {
logger.debug("no more sessions can be created, wait... queue.size={}", queue.size());
}
this.wait(1000);
- long time = timeout < 60_000 ? timeout : 60_000;
+ long time = waitToGetSessionTimeoutInMs < 60_000 ? waitToGetSessionTimeoutInMs : 60_000;
if (System.currentTimeMillis() - start > time) {
logger.warn(
"the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
@@ -218,7 +229,7 @@ public class SessionPool {
occupied.size(),
queue.size(),
size);
- if (System.currentTimeMillis() - start > timeout) {
+ if (System.currentTimeMillis() - start > waitToGetSessionTimeoutInMs) {
throw new IoTDBConnectionException(
String.format("timeout to get a connection from %s:%s", host, port));
}
@@ -243,7 +254,7 @@ public class SessionPool {
}
session = new Session(host, port, user, password, fetchSize, zoneId, enableCacheLeader);
try {
- session.open(enableCompression);
+ session.open(enableCompression, connectionTimeoutInMs);
// avoid someone has called close() the session pool
synchronized (this) {
if (closed) {
@@ -339,7 +350,7 @@ public class SessionPool {
private void tryConstructNewSession() {
Session session = new Session(host, port, user, password, fetchSize, zoneId, enableCacheLeader);
try {
- session.open(enableCompression);
+ session.open(enableCompression, connectionTimeoutInMs);
// avoid someone has called close() the session pool
synchronized (this) {
if (closed) {
@@ -1206,8 +1217,8 @@ public class SessionPool {
return zoneId;
}
- public long getTimeout() {
- return timeout;
+ public long getWaitToGetSessionTimeoutInMs() {
+ return waitToGetSessionTimeoutInMs;
}
public boolean isEnableCompression() {
@@ -1218,17 +1229,23 @@ public class SessionPool {
return enableCacheLeader;
}
+ public int getConnectionTimeoutInMs() {
+ return connectionTimeoutInMs;
+ }
+
public static class Builder {
+
private String host = Config.DEFAULT_HOST;
private int port = Config.DEFAULT_PORT;
private int maxSize = Config.DEFAULT_SESSION_POOL_MAX_SIZE;
private String user = Config.DEFAULT_USER;
private String password = Config.DEFAULT_PASSWORD;
private int fetchSize = Config.DEFAULT_FETCH_SIZE;
- private long timeout = 60_000;
+ private long waitToGetSessionTimeoutInMs = 60_000;
private boolean enableCompression = false;
private ZoneId zoneId = null;
private boolean enableCacheLeader = Config.DEFAULT_CACHE_LEADER_MODE;
+ private int connectionTimeoutInMs = Config.DEFAULT_CONNECTION_TIMEOUT_MS;
public Builder host(String host) {
this.host = host;
@@ -1265,8 +1282,8 @@ public class SessionPool {
return this;
}
- public Builder timeout(long timeout) {
- this.timeout = timeout;
+ public Builder waitToGetSessionTimeoutInMs(long waitToGetSessionTimeoutInMs) {
+ this.waitToGetSessionTimeoutInMs = waitToGetSessionTimeoutInMs;
return this;
}
@@ -1280,6 +1297,11 @@ public class SessionPool {
return this;
}
+ public Builder connectionTimeoutInMs(int connectionTimeoutInMs) {
+ this.connectionTimeoutInMs = connectionTimeoutInMs;
+ return this;
+ }
+
public SessionPool build() {
return new SessionPool(
host,
@@ -1288,10 +1310,11 @@ public class SessionPool {
password,
maxSize,
fetchSize,
- timeout,
+ waitToGetSessionTimeoutInMs,
enableCompression,
zoneId,
- enableCacheLeader);
+ enableCacheLeader,
+ connectionTimeoutInMs);
}
}
}
diff --git a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
index 9116070..dc39d47 100644
--- a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
+++ b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Config;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.junit.After;
@@ -228,7 +229,18 @@ public class SessionPoolTest {
@Test
public void tryIfTheServerIsRestart() {
SessionPool pool =
- new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000, false, null, false);
+ new SessionPool(
+ "127.0.0.1",
+ 6667,
+ "root",
+ "root",
+ 3,
+ 1,
+ 6000,
+ false,
+ null,
+ false,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
write10Data(pool, true);
SessionDataSetWrapper wrapper = null;
try {
@@ -244,7 +256,19 @@ public class SessionPoolTest {
EnvironmentUtils.stopDaemon();
EnvironmentUtils.reactiveDaemon();
- pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000, false, null, false);
+ pool =
+ new SessionPool(
+ "127.0.0.1",
+ 6667,
+ "root",
+ "root",
+ 3,
+ 1,
+ 6000,
+ false,
+ null,
+ false,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
correctQuery(pool);
pool.close();
return;
@@ -264,7 +288,19 @@ public class SessionPoolTest {
pool.close();
EnvironmentUtils.stopDaemon();
EnvironmentUtils.reactiveDaemon();
- pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000, false, null, false);
+ pool =
+ new SessionPool(
+ "127.0.0.1",
+ 6667,
+ "root",
+ "root",
+ 3,
+ 1,
+ 6000,
+ false,
+ null,
+ false,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
correctQuery(pool);
pool.close();
} catch (StatementExecutionException es) {
@@ -283,10 +319,21 @@ public class SessionPoolTest {
@Test
public void tryIfTheServerIsRestartButDataIsGotten() {
SessionPool pool =
- new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 60000, false, null, false);
+ new SessionPool(
+ "127.0.0.1",
+ 6667,
+ "root",
+ "root",
+ 3,
+ 1,
+ 60000,
+ false,
+ null,
+ false,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
write10Data(pool, true);
assertEquals(1, pool.currentAvailableSize());
- SessionDataSetWrapper wrapper = null;
+ SessionDataSetWrapper wrapper;
try {
wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time > 1");
// user does not know what happens.
@@ -309,12 +356,35 @@ public class SessionPoolTest {
@Test
public void restart() {
SessionPool pool =
- new SessionPool("127.0.0.1", 6667, "root", "root", 1, 1, 1000, false, null, false);
+ new SessionPool(
+ "127.0.0.1",
+ 6667,
+ "root",
+ "root",
+ 1,
+ 1,
+ 1000,
+ false,
+ null,
+ false,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
write10Data(pool, true);
// stop the server.
pool.close();
EnvironmentUtils.stopDaemon();
- pool = new SessionPool("127.0.0.1", 6667, "root", "root", 1, 1, 1000, false, null, false);
+ pool =
+ new SessionPool(
+ "127.0.0.1",
+ 6667,
+ "root",
+ "root",
+ 1,
+ 1,
+ 1000,
+ false,
+ null,
+ false,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
// all this ten data will fail.
write10Data(pool, false);
// restart the server
@@ -344,7 +414,18 @@ public class SessionPoolTest {
@Test
public void testClose() {
SessionPool pool =
- new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 60000, false, null, false);
+ new SessionPool(
+ "127.0.0.1",
+ 6667,
+ "root",
+ "root",
+ 3,
+ 1,
+ 60000,
+ false,
+ null,
+ false,
+ Config.DEFAULT_CONNECTION_TIMEOUT_MS);
pool.close();
try {
pool.insertRecord(
@@ -373,10 +454,11 @@ public class SessionPoolTest {
.user("abc")
.password("123")
.fetchSize(1)
- .timeout(2)
+ .waitToGetSessionTimeoutInMs(2)
.enableCacheLeader(true)
.enableCompression(true)
.zoneId(ZoneOffset.UTC)
+ .connectionTimeoutInMs(3)
.build();
assertEquals("localhost", pool.getHost());
@@ -385,9 +467,10 @@ public class SessionPoolTest {
assertEquals("123", pool.getPassword());
assertEquals(10, pool.getMaxSize());
assertEquals(1, pool.getFetchSize());
- assertEquals(2, pool.getTimeout());
+ assertEquals(2, pool.getWaitToGetSessionTimeoutInMs());
assertTrue(pool.isEnableCacheLeader());
assertTrue(pool.isEnableCompression());
+ assertEquals(3, pool.getConnectionTimeoutInMs());
assertEquals(ZoneOffset.UTC, pool.getZoneId());
}
}