You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/08/19 13:19:46 UTC
[iotdb] 01/01: optimize session creation
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch session-pool-optimize
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 049c0984683e600464d7e0d5f4cb2276bdc0688c
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Thu Aug 19 19:43:02 2021 +0800
optimize session creation
---
.../org/apache/iotdb/session/pool/SessionPool.java | 140 ++++++++++-----------
1 file changed, 70 insertions(+), 70 deletions(-)
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 79a539d..ed04509 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -174,7 +174,6 @@ public class SessionPool {
// if this method throws an exception, either the server is broken, or the ip/port/user/password
// is incorrect.
-
@SuppressWarnings({"squid:S3776", "squid:S2446"}) // Suppress high Cognitive Complexity warning
private Session getSession() throws IoTDBConnectionException {
Session session = queue.poll();
@@ -183,91 +182,92 @@ public class SessionPool {
}
if (session != null) {
return session;
- } else {
- long start = System.currentTimeMillis();
- boolean canCreate = false;
+ }
+
+ boolean shouldCreate = false;
+
+ long start = System.currentTimeMillis();
+ while (session == null) {
synchronized (this) {
if (size < maxSize) {
// we can create more session
size++;
- canCreate = true;
+ shouldCreate = true;
// but we do it after skip synchronized block because connection a session is time
// consuming.
+ break;
}
- }
- if (canCreate) {
- // create a new one.
- if (logger.isDebugEnabled()) {
- logger.debug("Create a new Session {}, {}, {}, {}", ip, port, user, password);
- }
- session = new Session(ip, port, user, password, fetchSize, zoneId, enableCacheLeader);
+
+ // we have to wait for someone returns a session.
try {
- session.open(enableCompression);
- // avoid someone has called close() the session pool
- synchronized (this) {
- if (closed) {
- // have to release the connection...
- session.close();
- throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
- } else {
- return session;
- }
+ if (logger.isDebugEnabled()) {
+ logger.debug("no more sessions can be created, wait... queue.size={}", queue.size());
}
- } catch (IoTDBConnectionException e) {
- // if exception, we will throw the exception.
- // Meanwhile, we have to set size--
- synchronized (this) {
- size--;
- // we do not need to notifyAll as any waited thread can continue to work after waked up.
- this.notify();
- if (logger.isDebugEnabled()) {
- logger.debug("open session failed, reduce the count and notify others...");
+ this.wait(1000);
+ long time = timeout < 60_000 ? timeout : 60_000;
+ if (System.currentTimeMillis() - start > time) {
+ logger.warn(
+ "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
+ (System.currentTimeMillis() - start) / 1000,
+ ip,
+ port,
+ user,
+ password);
+ logger.warn(
+ "current occupied size {}, queue size {}, considered size {} ",
+ occupied.size(),
+ queue.size(),
+ size);
+ if (System.currentTimeMillis() - start > timeout) {
+ throw new IoTDBConnectionException(
+ String.format("timeout to get a connection from %s:%s", ip, port));
}
}
- throw e;
+ } catch (InterruptedException e) {
+ logger.error("the SessionPool is damaged", e);
+ Thread.currentThread().interrupt();
}
- } else {
- while (session == null) {
- synchronized (this) {
- if (closed) {
- throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
- }
- // we have to wait for someone returns a session.
- try {
- if (logger.isDebugEnabled()) {
- logger.debug(
- "no more sessions can be created, wait... queue.size={}", queue.size());
- }
- this.wait(1000);
- long time = timeout < 60_000 ? timeout : 60_000;
- if (System.currentTimeMillis() - start > time) {
- logger.warn(
- "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
- (System.currentTimeMillis() - start) / 1000,
- ip,
- port,
- user,
- password);
- logger.warn(
- "current occupied size {}, queue size {}, considered size {} ",
- occupied.size(),
- queue.size(),
- size);
- if (System.currentTimeMillis() - start > timeout) {
- throw new IoTDBConnectionException(
- String.format("timeout to get a connection from %s:%s", ip, port));
- }
- }
- } catch (InterruptedException e) {
- logger.error("the SessionPool is damaged", e);
- Thread.currentThread().interrupt();
- }
- session = queue.poll();
+
+ session = queue.poll();
+
+ if (closed) {
+ throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
+ }
+ }
+ }
+
+ if (shouldCreate) {
+ // create a new one.
+ if (logger.isDebugEnabled()) {
+ logger.debug("Create a new Session {}, {}, {}, {}", ip, port, user, password);
+ }
+ session = new Session(ip, port, user, password, fetchSize, zoneId, enableCacheLeader);
+ try {
+ session.open(enableCompression);
+ // avoid someone has called close() the session pool
+ synchronized (this) {
+ if (closed) {
+ // have to release the connection...
+ session.close();
+ throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
}
}
- return session;
+ } catch (IoTDBConnectionException e) {
+ // if exception, we will throw the exception.
+ // Meanwhile, we have to set size--
+ synchronized (this) {
+ size--;
+ // we do not need to notifyAll as any waited thread can continue to work after waked up.
+ this.notify();
+ if (logger.isDebugEnabled()) {
+ logger.debug("open session failed, reduce the count and notify others...");
+ }
+ }
+ throw e;
}
}
+
+ return session;
}
public int currentAvailableSize() {