You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2023/02/13 07:24:17 UTC
[iotdb] 01/02: [IOTDB-5498] Fix SessionPool OOM when the numbers of devices and sessions are large (#9017)
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/new-rc1.0.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 771f2b69f803155d6fa9b4f76622858b6cfc179b
Author: Haonan <hh...@outlook.com>
AuthorDate: Wed Feb 8 19:44:41 2023 +0800
[IOTDB-5498] Fix SessionPool OOM when the numbers of devices and sessions are large (#9017)
---
.../java/org/apache/iotdb/isession/ISession.java | 7 +++++
.../java/org/apache/iotdb/session/Session.java | 31 ++++++++++++++++++++--
.../org/apache/iotdb/session/pool/SessionPool.java | 16 +++++++++--
3 files changed, 50 insertions(+), 4 deletions(-)
diff --git a/isession/src/main/java/org/apache/iotdb/isession/ISession.java b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
index 922e78d0bb..4ab78f128f 100644
--- a/isession/src/main/java/org/apache/iotdb/isession/ISession.java
+++ b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.isession;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.isession.template.Template;
import org.apache.iotdb.isession.util.SystemStatus;
import org.apache.iotdb.isession.util.Version;
@@ -54,6 +55,12 @@ public interface ISession extends AutoCloseable {
void open(boolean enableRPCCompression, int connectionTimeoutInMs)
throws IoTDBConnectionException;
+ void open(
+ boolean enableRPCCompression,
+ int connectionTimeoutInMs,
+ Map<String, TEndPoint> deviceIdToEndpoint)
+ throws IoTDBConnectionException;
+
void close() throws IoTDBConnectionException;
String getTimeZone();
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 9638238c3d..0298785756 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -404,6 +404,28 @@ public class Session implements ISession {
}
}
+ @Override
+ public synchronized void open(
+ boolean enableRPCCompression,
+ int connectionTimeoutInMs,
+ Map<String, TEndPoint> deviceIdToEndpoint)
+ throws IoTDBConnectionException {
+ if (!isClosed) {
+ return;
+ }
+
+ this.enableRPCCompression = enableRPCCompression;
+ this.connectionTimeoutInMs = connectionTimeoutInMs;
+ defaultSessionConnection = constructSessionConnection(this, defaultEndPoint, zoneId);
+ defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
+ isClosed = false;
+ if (enableRedirection || enableQueryRedirection) {
+ this.deviceIdToEndpoint = deviceIdToEndpoint;
+ endPointToSessionConnection = new ConcurrentHashMap<>();
+ endPointToSessionConnection.put(defaultEndPoint, defaultSessionConnection);
+ }
+ }
+
@Override
public synchronized void close() throws IoTDBConnectionException {
if (isClosed) {
@@ -920,7 +942,8 @@ public class Session implements ISession {
TEndPoint endPoint;
if (enableRedirection
&& !deviceIdToEndpoint.isEmpty()
- && (endPoint = deviceIdToEndpoint.get(deviceId)) != null) {
+ && (endPoint = deviceIdToEndpoint.get(deviceId)) != null
+ && endPointToSessionConnection.containsKey(endPoint)) {
return endPointToSessionConnection.get(endPoint);
} else {
return defaultSessionConnection;
@@ -965,7 +988,10 @@ public class Session implements ISession {
return;
}
AtomicReference<IoTDBConnectionException> exceptionReference = new AtomicReference<>();
- deviceIdToEndpoint.put(deviceId, endpoint);
+ if (!deviceIdToEndpoint.containsKey(deviceId)
+ || !deviceIdToEndpoint.get(deviceId).equals(endpoint)) {
+ deviceIdToEndpoint.put(deviceId, endpoint);
+ }
SessionConnection connection =
endPointToSessionConnection.computeIfAbsent(
endpoint,
@@ -3259,6 +3285,7 @@ public class Session implements ISession {
completableFuture.join();
} catch (CompletionException completionException) {
Throwable cause = completionException.getCause();
+ logger.error("Meet error when async insert!", cause);
if (cause instanceof IoTDBConnectionException) {
throw (IoTDBConnectionException) cause;
} else {
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 12661ec21d..ed89b085bf 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.session.pool;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.isession.ISessionDataSet;
import org.apache.iotdb.isession.SessionConfig;
@@ -93,6 +94,8 @@ public class SessionPool implements ISessionPool {
private boolean enableRedirection;
private boolean enableQueryRedirection = false;
+ private Map<String, TEndPoint> deviceIdToEndpoint;
+
private int thriftDefaultBufferSize;
private int thriftMaxFrameSize;
@@ -299,6 +302,9 @@ public class SessionPool implements ISessionPool {
this.enableCompression = enableCompression;
this.zoneId = zoneId;
this.enableRedirection = enableRedirection;
+ if (this.enableRedirection) {
+ deviceIdToEndpoint = new ConcurrentHashMap<>();
+ }
this.connectionTimeoutInMs = connectionTimeoutInMs;
this.version = version;
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
@@ -330,6 +336,9 @@ public class SessionPool implements ISessionPool {
this.enableCompression = enableCompression;
this.zoneId = zoneId;
this.enableRedirection = enableRedirection;
+ if (this.enableRedirection) {
+ deviceIdToEndpoint = new ConcurrentHashMap<>();
+ }
this.connectionTimeoutInMs = connectionTimeoutInMs;
this.version = version;
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
@@ -448,7 +457,7 @@ public class SessionPool implements ISessionPool {
session = constructNewSession();
try {
- session.open(enableCompression, connectionTimeoutInMs);
+ session.open(enableCompression, connectionTimeoutInMs, deviceIdToEndpoint);
// avoid someone has called close() the session pool
synchronized (this) {
if (closed) {
@@ -548,7 +557,7 @@ public class SessionPool implements ISessionPool {
private void tryConstructNewSession() {
Session session = constructNewSession();
try {
- session.open(enableCompression, connectionTimeoutInMs);
+ session.open(enableCompression, connectionTimeoutInMs, deviceIdToEndpoint);
// avoid someone has called close() the session pool
synchronized (this) {
if (closed) {
@@ -2639,6 +2648,9 @@ public class SessionPool implements ISessionPool {
@Override
public void setEnableRedirection(boolean enableRedirection) {
this.enableRedirection = enableRedirection;
+ if (this.enableRedirection) {
+ deviceIdToEndpoint = new ConcurrentHashMap<>();
+ }
for (ISession session : queue) {
session.setEnableRedirection(enableRedirection);
}