You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/11/14 09:38:48 UTC

[iotdb] 01/01: fix session pool bug when someone call pool.close

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch fix_session_pool
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 011c4346cbfc8770330f532b3f8e396738dca74a
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Sat Nov 14 17:38:10 2020 +0800

    fix session pool bug when someone call pool.close
---
 .../org/apache/iotdb/session/pool/SessionPool.java | 119 ++++++++++++++++-----
 .../apache/iotdb/session/pool/SessionPoolTest.java |   7 ++
 2 files changed, 99 insertions(+), 27 deletions(-)

diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index a80ec41..48f278f 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -76,6 +76,8 @@ public class SessionPool {
   private boolean enableCompression = false;
   private ZoneId zoneId;
 
+  private boolean closed;//whether the queue is closed.
+
   public SessionPool(String ip, int port, String user, String password, int maxSize) {
     this(ip, port, user, password, maxSize, Config.DEFAULT_FETCH_SIZE, 60_000, false, null);
   }
@@ -110,20 +112,64 @@ public class SessionPool {
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   private Session getSession() throws IoTDBConnectionException {
     Session session = queue.poll();
+    if (closed) {
+      throw new IoTDBConnectionException("Session pool is closed");
+    }
     if (session != null) {
       return session;
     } else {
+      long start = System.currentTimeMillis();
+      boolean canCreate = false;
       synchronized (this) {
-        long start = System.currentTimeMillis();
+        if (size < maxSize) {
+          //we can create more session
+          size++;
+          canCreate = true;
+          //but we do it after skip synchronized block because connection a session is time consuming.
+        }
+      }
+      if (canCreate) {
+        //create a new one.
+        if (logger.isDebugEnabled()) {
+          logger.debug("Create a new Session {}, {}, {}, {}", ip, port, user, password);
+        }
+        session = new Session(ip, port, user, password, fetchSize, zoneId);
+        try {
+          session.open(enableCompression);
+          //avoid someone has called close() the session pool
+          synchronized (this) {
+            if (closed) {
+              //have to release the connection...
+              session.close();
+              throw new IoTDBConnectionException("Session pool is closed");
+            } else {
+              return session;
+            }
+          }
+        } catch (IoTDBConnectionException e) {
+          //if exception, we will throw the exception.
+          //Meanwhile, we have to set size--
+          synchronized (this) {
+            size--;
+            this.notifyAll();
+            if (logger.isDebugEnabled()) {
+              logger.debug("open session failed, reduce the count and notify others...");
+            }
+          }
+          throw e;
+        }
+      }
+      else {
         while (session == null) {
-          if (size < maxSize) {
-            //we can create more session
-            size++;
-            //but we do it after skip synchronized block because connection a session is time consuming.
-            break;
-          } else {
+          if (closed) {
+            throw new IoTDBConnectionException("Session pool is closed");
+          }
+          synchronized (this) {
             //we have to wait for someone returns a session.
             try {
+              if (logger.isDebugEnabled()) {
+                logger.debug("no more sessions can be created, wait... queue.size={}", queue.size());
+              }
               this.wait(1000);
               long time = timeout < 60_000 ? timeout : 60_000;
               if (System.currentTimeMillis() - start > time) {
@@ -144,25 +190,8 @@ public class SessionPool {
             session = queue.poll();
           }
         }
-        if (session != null) {
-          return session;
-        }
-      }
-      if (logger.isDebugEnabled()) {
-        logger.debug("Create a new Session {}, {}, {}, {}", ip, port, user, password);
+        return session;
       }
-      session = new Session(ip, port, user, password, fetchSize, zoneId);
-      try {
-        session.open(enableCompression);
-      } catch (IoTDBConnectionException e) {
-        //if exception, we will throw the exception.
-        //Meanwhile, we have to set size--
-        synchronized (this) {
-          size--;
-        }
-        throw e;
-      }
-      return session;
     }
   }
 
@@ -178,6 +207,10 @@ public class SessionPool {
     queue.push(session);
     synchronized (this) {
       this.notifyAll();
+      //comment the following codes as putBack is too frequently called.
+//      if (logger.isTraceEnabled()) {
+//        logger.trace("put a session back and notify others..., queue.size = {}", queue.size());
+//      }
     }
   }
 
@@ -194,6 +227,7 @@ public class SessionPool {
         session.close();
       } catch (IoTDBConnectionException e) {
         //do nothing
+        logger.warn("close the session failed.", e);
       }
     }
     for (Session session : occupied.keySet()) {
@@ -201,8 +235,11 @@ public class SessionPool {
         session.close();
       } catch (IoTDBConnectionException e) {
         //do nothing
+        logger.warn("close the session failed.", e);
       }
     }
+    logger.info("closing the session pool, cleaning queues...");
+    this.closed = true;
     queue.clear();
     occupied.clear();
   }
@@ -223,10 +260,12 @@ public class SessionPool {
   }
 
   private synchronized void removeSession() {
+    logger.warn("Remove a broken Session {}, {}, {}", ip, port, user);
+    size--;
+    this.notifyAll();
     if (logger.isDebugEnabled()) {
-      logger.debug("Remove a broken Session {}, {}, {}, {}", ip, port, user, password);
+        logger.debug("remove a broken session and notify others..., queue.size = {}", queue.size());
     }
-    size--;
   }
 
   private void closeSession(Session session) {
@@ -235,6 +274,7 @@ public class SessionPool {
         session.close();
       } catch (Exception e2) {
         //do nothing. We just want to guarantee the session is closed.
+        logger.warn("close the session failed.", e2);
       }
     }
   }
@@ -296,6 +336,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("insertTablet failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException | RuntimeException e) {
         putBack(session);
@@ -330,6 +371,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("insertTablets failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException | RuntimeException e) {
         putBack(session);
@@ -356,6 +398,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("insertRecords failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException | RuntimeException e) {
         putBack(session);
@@ -383,6 +426,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("insertRecords failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException | RuntimeException e) {
         putBack(session);
@@ -409,6 +453,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("insertRecord failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException | RuntimeException e) {
         putBack(session);
@@ -435,6 +480,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("insertRecord failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -457,6 +503,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("testInsertTablet failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException | RuntimeException e) {
         putBack(session);
@@ -479,6 +526,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("testInsertTablets failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException | RuntimeException e) {
         putBack(session);
@@ -502,6 +550,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("testInsertRecords failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException | RuntimeException e) {
         putBack(session);
@@ -526,6 +575,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("testInsertRecords failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException | RuntimeException e) {
         putBack(session);
@@ -548,6 +598,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("testInsertRecord failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -571,6 +622,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("testInsertRecord failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException | RuntimeException e) {
         putBack(session);
@@ -594,6 +646,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("deleteTimeseries failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -617,6 +670,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("deleteTimeseries failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -641,6 +695,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("deleteData failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -665,6 +720,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("deleteData failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -683,6 +739,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("setStorageGroup failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -701,6 +758,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("deleteStorageGroup failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -719,6 +777,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("deleteStorageGroups failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -737,6 +796,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("createTimeseries failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -758,6 +818,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("createTimeseries failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -780,6 +841,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("createMultiTimeseries failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -798,6 +860,7 @@ public class SessionPool {
         return resp;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("checkTimeseriesExists failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -828,6 +891,7 @@ public class SessionPool {
         return wrapper;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("executeQueryStatement failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -853,6 +917,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("executeNonQueryStatement failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
diff --git a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
index 9076d19..fbca8e4 100644
--- a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
+++ b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -237,4 +237,11 @@ public class SessionPoolTest {
     }
   }
 
+  @Test
+  public void testClose() {
+    SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 60000, false, null);
+    write10Data(pool, true);
+
+  }
+
 }
\ No newline at end of file