You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2023/02/13 07:24:17 UTC

[iotdb] 01/02: [IOTDB-5498] Fix SessionPool OOM when the numbers of devices and sessions are large (#9017)

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/new-rc1.0.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 771f2b69f803155d6fa9b4f76622858b6cfc179b
Author: Haonan <hh...@outlook.com>
AuthorDate: Wed Feb 8 19:44:41 2023 +0800

    [IOTDB-5498] Fix SessionPool OOM when the numbers of devices and sessions are large (#9017)
---
 .../java/org/apache/iotdb/isession/ISession.java   |  7 +++++
 .../java/org/apache/iotdb/session/Session.java     | 31 ++++++++++++++++++++--
 .../org/apache/iotdb/session/pool/SessionPool.java | 16 +++++++++--
 3 files changed, 50 insertions(+), 4 deletions(-)

diff --git a/isession/src/main/java/org/apache/iotdb/isession/ISession.java b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
index 922e78d0bb..4ab78f128f 100644
--- a/isession/src/main/java/org/apache/iotdb/isession/ISession.java
+++ b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.isession;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.isession.template.Template;
 import org.apache.iotdb.isession.util.SystemStatus;
 import org.apache.iotdb.isession.util.Version;
@@ -54,6 +55,12 @@ public interface ISession extends AutoCloseable {
   void open(boolean enableRPCCompression, int connectionTimeoutInMs)
       throws IoTDBConnectionException;
 
+  void open(
+      boolean enableRPCCompression,
+      int connectionTimeoutInMs,
+      Map<String, TEndPoint> deviceIdToEndpoint)
+      throws IoTDBConnectionException;
+
   void close() throws IoTDBConnectionException;
 
   String getTimeZone();
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 9638238c3d..0298785756 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -404,6 +404,28 @@ public class Session implements ISession {
     }
   }
 
+  @Override
+  public synchronized void open(
+      boolean enableRPCCompression,
+      int connectionTimeoutInMs,
+      Map<String, TEndPoint> deviceIdToEndpoint)
+      throws IoTDBConnectionException {
+    if (!isClosed) {
+      return;
+    }
+
+    this.enableRPCCompression = enableRPCCompression;
+    this.connectionTimeoutInMs = connectionTimeoutInMs;
+    defaultSessionConnection = constructSessionConnection(this, defaultEndPoint, zoneId);
+    defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
+    isClosed = false;
+    if (enableRedirection || enableQueryRedirection) {
+      this.deviceIdToEndpoint = deviceIdToEndpoint;
+      endPointToSessionConnection = new ConcurrentHashMap<>();
+      endPointToSessionConnection.put(defaultEndPoint, defaultSessionConnection);
+    }
+  }
+
   @Override
   public synchronized void close() throws IoTDBConnectionException {
     if (isClosed) {
@@ -920,7 +942,8 @@ public class Session implements ISession {
     TEndPoint endPoint;
     if (enableRedirection
         && !deviceIdToEndpoint.isEmpty()
-        && (endPoint = deviceIdToEndpoint.get(deviceId)) != null) {
+        && (endPoint = deviceIdToEndpoint.get(deviceId)) != null
+        && endPointToSessionConnection.containsKey(endPoint)) {
       return endPointToSessionConnection.get(endPoint);
     } else {
       return defaultSessionConnection;
@@ -965,7 +988,10 @@ public class Session implements ISession {
         return;
       }
       AtomicReference<IoTDBConnectionException> exceptionReference = new AtomicReference<>();
-      deviceIdToEndpoint.put(deviceId, endpoint);
+      if (!deviceIdToEndpoint.containsKey(deviceId)
+          || !deviceIdToEndpoint.get(deviceId).equals(endpoint)) {
+        deviceIdToEndpoint.put(deviceId, endpoint);
+      }
       SessionConnection connection =
           endPointToSessionConnection.computeIfAbsent(
               endpoint,
@@ -3259,6 +3285,7 @@ public class Session implements ISession {
         completableFuture.join();
       } catch (CompletionException completionException) {
         Throwable cause = completionException.getCause();
+        logger.error("Meet error when async insert!", cause);
         if (cause instanceof IoTDBConnectionException) {
           throw (IoTDBConnectionException) cause;
         } else {
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 12661ec21d..ed89b085bf 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.session.pool;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.isession.ISession;
 import org.apache.iotdb.isession.ISessionDataSet;
 import org.apache.iotdb.isession.SessionConfig;
@@ -93,6 +94,8 @@ public class SessionPool implements ISessionPool {
   private boolean enableRedirection;
   private boolean enableQueryRedirection = false;
 
+  private Map<String, TEndPoint> deviceIdToEndpoint;
+
   private int thriftDefaultBufferSize;
   private int thriftMaxFrameSize;
 
@@ -299,6 +302,9 @@ public class SessionPool implements ISessionPool {
     this.enableCompression = enableCompression;
     this.zoneId = zoneId;
     this.enableRedirection = enableRedirection;
+    if (this.enableRedirection) {
+      deviceIdToEndpoint = new ConcurrentHashMap<>();
+    }
     this.connectionTimeoutInMs = connectionTimeoutInMs;
     this.version = version;
     this.thriftDefaultBufferSize = thriftDefaultBufferSize;
@@ -330,6 +336,9 @@ public class SessionPool implements ISessionPool {
     this.enableCompression = enableCompression;
     this.zoneId = zoneId;
     this.enableRedirection = enableRedirection;
+    if (this.enableRedirection) {
+      deviceIdToEndpoint = new ConcurrentHashMap<>();
+    }
     this.connectionTimeoutInMs = connectionTimeoutInMs;
     this.version = version;
     this.thriftDefaultBufferSize = thriftDefaultBufferSize;
@@ -448,7 +457,7 @@ public class SessionPool implements ISessionPool {
       session = constructNewSession();
 
       try {
-        session.open(enableCompression, connectionTimeoutInMs);
+        session.open(enableCompression, connectionTimeoutInMs, deviceIdToEndpoint);
         // avoid someone has called close() the session pool
         synchronized (this) {
           if (closed) {
@@ -548,7 +557,7 @@ public class SessionPool implements ISessionPool {
   private void tryConstructNewSession() {
     Session session = constructNewSession();
     try {
-      session.open(enableCompression, connectionTimeoutInMs);
+      session.open(enableCompression, connectionTimeoutInMs, deviceIdToEndpoint);
       // avoid someone has called close() the session pool
       synchronized (this) {
         if (closed) {
@@ -2639,6 +2648,9 @@ public class SessionPool implements ISessionPool {
   @Override
   public void setEnableRedirection(boolean enableRedirection) {
     this.enableRedirection = enableRedirection;
+    if (this.enableRedirection) {
+      deviceIdToEndpoint = new ConcurrentHashMap<>();
+    }
     for (ISession session : queue) {
       session.setEnableRedirection(enableRedirection);
     }