You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/11/14 09:38:48 UTC
[iotdb] 01/01: fix session pool bug when someone call pool.close
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch fix_session_pool
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 011c4346cbfc8770330f532b3f8e396738dca74a
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Sat Nov 14 17:38:10 2020 +0800
fix session pool bug when someone call pool.close
---
.../org/apache/iotdb/session/pool/SessionPool.java | 119 ++++++++++++++++-----
.../apache/iotdb/session/pool/SessionPoolTest.java | 7 ++
2 files changed, 99 insertions(+), 27 deletions(-)
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index a80ec41..48f278f 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -76,6 +76,8 @@ public class SessionPool {
private boolean enableCompression = false;
private ZoneId zoneId;
+ private boolean closed;//whether the queue is closed.
+
public SessionPool(String ip, int port, String user, String password, int maxSize) {
this(ip, port, user, password, maxSize, Config.DEFAULT_FETCH_SIZE, 60_000, false, null);
}
@@ -110,20 +112,64 @@ public class SessionPool {
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
private Session getSession() throws IoTDBConnectionException {
Session session = queue.poll();
+ if (closed) {
+ throw new IoTDBConnectionException("Session pool is closed");
+ }
if (session != null) {
return session;
} else {
+ long start = System.currentTimeMillis();
+ boolean canCreate = false;
synchronized (this) {
- long start = System.currentTimeMillis();
+ if (size < maxSize) {
+ //we can create more session
+ size++;
+ canCreate = true;
+ //but we do it after skip synchronized block because connection a session is time consuming.
+ }
+ }
+ if (canCreate) {
+ //create a new one.
+ if (logger.isDebugEnabled()) {
+ logger.debug("Create a new Session {}, {}, {}, {}", ip, port, user, password);
+ }
+ session = new Session(ip, port, user, password, fetchSize, zoneId);
+ try {
+ session.open(enableCompression);
+ //avoid someone has called close() the session pool
+ synchronized (this) {
+ if (closed) {
+ //have to release the connection...
+ session.close();
+ throw new IoTDBConnectionException("Session pool is closed");
+ } else {
+ return session;
+ }
+ }
+ } catch (IoTDBConnectionException e) {
+ //if exception, we will throw the exception.
+ //Meanwhile, we have to set size--
+ synchronized (this) {
+ size--;
+ this.notifyAll();
+ if (logger.isDebugEnabled()) {
+ logger.debug("open session failed, reduce the count and notify others...");
+ }
+ }
+ throw e;
+ }
+ }
+ else {
while (session == null) {
- if (size < maxSize) {
- //we can create more session
- size++;
- //but we do it after skip synchronized block because connection a session is time consuming.
- break;
- } else {
+ if (closed) {
+ throw new IoTDBConnectionException("Session pool is closed");
+ }
+ synchronized (this) {
//we have to wait for someone returns a session.
try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("no more sessions can be created, wait... queue.size={}", queue.size());
+ }
this.wait(1000);
long time = timeout < 60_000 ? timeout : 60_000;
if (System.currentTimeMillis() - start > time) {
@@ -144,25 +190,8 @@ public class SessionPool {
session = queue.poll();
}
}
- if (session != null) {
- return session;
- }
- }
- if (logger.isDebugEnabled()) {
- logger.debug("Create a new Session {}, {}, {}, {}", ip, port, user, password);
+ return session;
}
- session = new Session(ip, port, user, password, fetchSize, zoneId);
- try {
- session.open(enableCompression);
- } catch (IoTDBConnectionException e) {
- //if exception, we will throw the exception.
- //Meanwhile, we have to set size--
- synchronized (this) {
- size--;
- }
- throw e;
- }
- return session;
}
}
@@ -178,6 +207,10 @@ public class SessionPool {
queue.push(session);
synchronized (this) {
this.notifyAll();
+ //comment the following codes as putBack is too frequently called.
+// if (logger.isTraceEnabled()) {
+// logger.trace("put a session back and notify others..., queue.size = {}", queue.size());
+// }
}
}
@@ -194,6 +227,7 @@ public class SessionPool {
session.close();
} catch (IoTDBConnectionException e) {
//do nothing
+ logger.warn("close the session failed.", e);
}
}
for (Session session : occupied.keySet()) {
@@ -201,8 +235,11 @@ public class SessionPool {
session.close();
} catch (IoTDBConnectionException e) {
//do nothing
+ logger.warn("close the session failed.", e);
}
}
+ logger.info("closing the session pool, cleaning queues...");
+ this.closed = true;
queue.clear();
occupied.clear();
}
@@ -223,10 +260,12 @@ public class SessionPool {
}
private synchronized void removeSession() {
+ logger.warn("Remove a broken Session {}, {}, {}", ip, port, user);
+ size--;
+ this.notifyAll();
if (logger.isDebugEnabled()) {
- logger.debug("Remove a broken Session {}, {}, {}, {}", ip, port, user, password);
+ logger.debug("remove a broken session and notify others..., queue.size = {}", queue.size());
}
- size--;
}
private void closeSession(Session session) {
@@ -235,6 +274,7 @@ public class SessionPool {
session.close();
} catch (Exception e2) {
//do nothing. We just want to guarantee the session is closed.
+ logger.warn("close the session failed.", e2);
}
}
}
@@ -296,6 +336,7 @@ public class SessionPool {
return;
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new one.
+ logger.warn("insertTablet failed", e);
cleanSessionAndMayThrowConnectionException(session, i, e);
} catch (StatementExecutionException | RuntimeException e) {
putBack(session);
@@ -330,6 +371,7 @@ public class SessionPool {
return;
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new one.
+ logger.warn("insertTablets failed", e);
cleanSessionAndMayThrowConnectionException(session, i, e);
} catch (StatementExecutionException | RuntimeException e) {
putBack(session);
@@ -356,6 +398,7 @@ public class SessionPool {
return;
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new one.
+ logger.warn("insertRecords failed", e);
cleanSessionAndMayThrowConnectionException(session, i, e);
} catch (StatementExecutionException | RuntimeException e) {
putBack(session);
@@ -383,6 +426,7 @@ public class SessionPool {
return;
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new one.
+ logger.warn("insertRecords failed", e);
cleanSessionAndMayThrowConnectionException(session, i, e);
} catch (StatementExecutionException | RuntimeException e) {
putBack(session);
@@ -409,6 +453,7 @@ public class SessionPool {
return;
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new one.
+ logger.warn("insertRecord failed", e);
cleanSessionAndMayThrowConnectionException(session, i, e);
} catch (StatementExecutionException | RuntimeException e) {
putBack(session);
@@ -435,6 +480,7 @@ public class SessionPool {
return;
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new one.
+ logger.warn("insertRecord failed", e);
cleanSessionAndMayThrowConnectionException(session, i, e);
} catch (StatementExecutionException e) {
putBack(session);
@@ -457,6 +503,7 @@ public class SessionPool {
return;
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new one.
+ logger.warn("testInsertTablet failed", e);
cleanSessionAndMayThrowConnectionException(session, i, e);
} catch (StatementExecutionException | RuntimeException e) {
putBack(session);
@@ -479,6 +526,7 @@ public class SessionPool {
return;
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new one.
+ logger.warn("testInsertTablets failed", e);
cleanSessionAndMayThrowConnectionException(session, i, e);
} catch (StatementExecutionException | RuntimeException e) {
putBack(session);
@@ -502,6 +550,7 @@ public class SessionPool {
return;
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new one.
+ logger.warn("testInsertRecords failed", e);
cleanSessionAndMayThrowConnectionException(session, i, e);
} catch (StatementExecutionException | RuntimeException e) {
putBack(session);
@@ -526,6 +575,7 @@ public class SessionPool {
return;
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new one.
+ logger.warn("testInsertRecords failed", e);
cleanSessionAndMayThrowConnectionException(session, i, e);
} catch (StatementExecutionException | RuntimeException e) {
putBack(session);
@@ -548,6 +598,7 @@ public class SessionPool {
return;
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new one.
+ logger.warn("testInsertRecord failed", e);
cleanSessionAndMayThrowConnectionException(session, i, e);
} catch (StatementExecutionException e) {
putBack(session);
@@ -571,6 +622,7 @@ public class SessionPool {
return;
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new one.
+ logger.warn("testInsertRecord failed", e);
cleanSessionAndMayThrowConnectionException(session, i, e);
} catch (StatementExecutionException | RuntimeException e) {
putBack(session);
@@ -594,6 +646,7 @@ public class SessionPool {
return;
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new one.
+ logger.warn("deleteTimeseries failed", e);
cleanSessionAndMayThrowConnectionException(session, i, e);
} catch (StatementExecutionException e) {
putBack(session);
@@ -617,6 +670,7 @@ public class SessionPool {
return;
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new one.
+ logger.warn("deleteTimeseries failed", e);
cleanSessionAndMayThrowConnectionException(session, i, e);
} catch (StatementExecutionException e) {
putBack(session);
@@ -641,6 +695,7 @@ public class SessionPool {
return;
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new one.
+ logger.warn("deleteData failed", e);
cleanSessionAndMayThrowConnectionException(session, i, e);
} catch (StatementExecutionException e) {
putBack(session);
@@ -665,6 +720,7 @@ public class SessionPool {
return;
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new one.
+ logger.warn("deleteData failed", e);
cleanSessionAndMayThrowConnectionException(session, i, e);
} catch (StatementExecutionException e) {
putBack(session);
@@ -683,6 +739,7 @@ public class SessionPool {
return;
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new one.
+ logger.warn("setStorageGroup failed", e);
cleanSessionAndMayThrowConnectionException(session, i, e);
} catch (StatementExecutionException e) {
putBack(session);
@@ -701,6 +758,7 @@ public class SessionPool {
return;
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new one.
+ logger.warn("deleteStorageGroup failed", e);
cleanSessionAndMayThrowConnectionException(session, i, e);
} catch (StatementExecutionException e) {
putBack(session);
@@ -719,6 +777,7 @@ public class SessionPool {
return;
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new one.
+ logger.warn("deleteStorageGroups failed", e);
cleanSessionAndMayThrowConnectionException(session, i, e);
} catch (StatementExecutionException e) {
putBack(session);
@@ -737,6 +796,7 @@ public class SessionPool {
return;
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new one.
+ logger.warn("createTimeseries failed", e);
cleanSessionAndMayThrowConnectionException(session, i, e);
} catch (StatementExecutionException e) {
putBack(session);
@@ -758,6 +818,7 @@ public class SessionPool {
return;
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new one.
+ logger.warn("createTimeseries failed", e);
cleanSessionAndMayThrowConnectionException(session, i, e);
} catch (StatementExecutionException e) {
putBack(session);
@@ -780,6 +841,7 @@ public class SessionPool {
return;
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new one.
+ logger.warn("createMultiTimeseries failed", e);
cleanSessionAndMayThrowConnectionException(session, i, e);
} catch (StatementExecutionException e) {
putBack(session);
@@ -798,6 +860,7 @@ public class SessionPool {
return resp;
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new one.
+ logger.warn("checkTimeseriesExists failed", e);
cleanSessionAndMayThrowConnectionException(session, i, e);
} catch (StatementExecutionException e) {
putBack(session);
@@ -828,6 +891,7 @@ public class SessionPool {
return wrapper;
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new one.
+ logger.warn("executeQueryStatement failed", e);
cleanSessionAndMayThrowConnectionException(session, i, e);
} catch (StatementExecutionException e) {
putBack(session);
@@ -853,6 +917,7 @@ public class SessionPool {
return;
} catch (IoTDBConnectionException e) {
// TException means the connection is broken, remove it and get a new one.
+ logger.warn("executeNonQueryStatement failed", e);
cleanSessionAndMayThrowConnectionException(session, i, e);
} catch (StatementExecutionException e) {
putBack(session);
diff --git a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
index 9076d19..fbca8e4 100644
--- a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
+++ b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -237,4 +237,11 @@ public class SessionPoolTest {
}
}
+ @Test
+ public void testClose() {
+ SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 60000, false, null);
+ write10Data(pool, true);
+
+ }
+
}
\ No newline at end of file