You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/11/14 09:38:47 UTC

[iotdb] branch fix_session_pool created (now 011c434)

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a change to branch fix_session_pool
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 011c434  fix session pool bug when someone call pool.close

This branch includes the following new commits:

     new 011c434  fix session pool bug when someone call pool.close

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: fix session pool bug when someone call pool.close

Posted by hx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch fix_session_pool
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 011c4346cbfc8770330f532b3f8e396738dca74a
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Sat Nov 14 17:38:10 2020 +0800

    fix session pool bug when someone call pool.close
---
 .../org/apache/iotdb/session/pool/SessionPool.java | 119 ++++++++++++++++-----
 .../apache/iotdb/session/pool/SessionPoolTest.java |   7 ++
 2 files changed, 99 insertions(+), 27 deletions(-)

diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index a80ec41..48f278f 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -76,6 +76,8 @@ public class SessionPool {
   private boolean enableCompression = false;
   private ZoneId zoneId;
 
+  private boolean closed;//whether the queue is closed.
+
   public SessionPool(String ip, int port, String user, String password, int maxSize) {
     this(ip, port, user, password, maxSize, Config.DEFAULT_FETCH_SIZE, 60_000, false, null);
   }
@@ -110,20 +112,64 @@ public class SessionPool {
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   private Session getSession() throws IoTDBConnectionException {
     Session session = queue.poll();
+    if (closed) {
+      throw new IoTDBConnectionException("Session pool is closed");
+    }
     if (session != null) {
       return session;
     } else {
+      long start = System.currentTimeMillis();
+      boolean canCreate = false;
       synchronized (this) {
-        long start = System.currentTimeMillis();
+        if (size < maxSize) {
+          //we can create more session
+          size++;
+          canCreate = true;
+          //but we do it after skip synchronized block because connection a session is time consuming.
+        }
+      }
+      if (canCreate) {
+        //create a new one.
+        if (logger.isDebugEnabled()) {
+          logger.debug("Create a new Session {}, {}, {}, {}", ip, port, user, password);
+        }
+        session = new Session(ip, port, user, password, fetchSize, zoneId);
+        try {
+          session.open(enableCompression);
+          //avoid someone has called close() the session pool
+          synchronized (this) {
+            if (closed) {
+              //have to release the connection...
+              session.close();
+              throw new IoTDBConnectionException("Session pool is closed");
+            } else {
+              return session;
+            }
+          }
+        } catch (IoTDBConnectionException e) {
+          //if exception, we will throw the exception.
+          //Meanwhile, we have to set size--
+          synchronized (this) {
+            size--;
+            this.notifyAll();
+            if (logger.isDebugEnabled()) {
+              logger.debug("open session failed, reduce the count and notify others...");
+            }
+          }
+          throw e;
+        }
+      }
+      else {
         while (session == null) {
-          if (size < maxSize) {
-            //we can create more session
-            size++;
-            //but we do it after skip synchronized block because connection a session is time consuming.
-            break;
-          } else {
+          if (closed) {
+            throw new IoTDBConnectionException("Session pool is closed");
+          }
+          synchronized (this) {
             //we have to wait for someone returns a session.
             try {
+              if (logger.isDebugEnabled()) {
+                logger.debug("no more sessions can be created, wait... queue.size={}", queue.size());
+              }
               this.wait(1000);
               long time = timeout < 60_000 ? timeout : 60_000;
               if (System.currentTimeMillis() - start > time) {
@@ -144,25 +190,8 @@ public class SessionPool {
             session = queue.poll();
           }
         }
-        if (session != null) {
-          return session;
-        }
-      }
-      if (logger.isDebugEnabled()) {
-        logger.debug("Create a new Session {}, {}, {}, {}", ip, port, user, password);
+        return session;
       }
-      session = new Session(ip, port, user, password, fetchSize, zoneId);
-      try {
-        session.open(enableCompression);
-      } catch (IoTDBConnectionException e) {
-        //if exception, we will throw the exception.
-        //Meanwhile, we have to set size--
-        synchronized (this) {
-          size--;
-        }
-        throw e;
-      }
-      return session;
     }
   }
 
@@ -178,6 +207,10 @@ public class SessionPool {
     queue.push(session);
     synchronized (this) {
       this.notifyAll();
+      //comment the following codes as putBack is too frequently called.
+//      if (logger.isTraceEnabled()) {
+//        logger.trace("put a session back and notify others..., queue.size = {}", queue.size());
+//      }
     }
   }
 
@@ -194,6 +227,7 @@ public class SessionPool {
         session.close();
       } catch (IoTDBConnectionException e) {
         //do nothing
+        logger.warn("close the session failed.", e);
       }
     }
     for (Session session : occupied.keySet()) {
@@ -201,8 +235,11 @@ public class SessionPool {
         session.close();
       } catch (IoTDBConnectionException e) {
         //do nothing
+        logger.warn("close the session failed.", e);
       }
     }
+    logger.info("closing the session pool, cleaning queues...");
+    this.closed = true;
     queue.clear();
     occupied.clear();
   }
@@ -223,10 +260,12 @@ public class SessionPool {
   }
 
   private synchronized void removeSession() {
+    logger.warn("Remove a broken Session {}, {}, {}", ip, port, user);
+    size--;
+    this.notifyAll();
     if (logger.isDebugEnabled()) {
-      logger.debug("Remove a broken Session {}, {}, {}, {}", ip, port, user, password);
+        logger.debug("remove a broken session and notify others..., queue.size = {}", queue.size());
     }
-    size--;
   }
 
   private void closeSession(Session session) {
@@ -235,6 +274,7 @@ public class SessionPool {
         session.close();
       } catch (Exception e2) {
         //do nothing. We just want to guarantee the session is closed.
+        logger.warn("close the session failed.", e2);
       }
     }
   }
@@ -296,6 +336,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("insertTablet failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException | RuntimeException e) {
         putBack(session);
@@ -330,6 +371,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("insertTablets failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException | RuntimeException e) {
         putBack(session);
@@ -356,6 +398,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("insertRecords failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException | RuntimeException e) {
         putBack(session);
@@ -383,6 +426,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("insertRecords failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException | RuntimeException e) {
         putBack(session);
@@ -409,6 +453,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("insertRecord failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException | RuntimeException e) {
         putBack(session);
@@ -435,6 +480,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("insertRecord failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -457,6 +503,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("testInsertTablet failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException | RuntimeException e) {
         putBack(session);
@@ -479,6 +526,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("testInsertTablets failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException | RuntimeException e) {
         putBack(session);
@@ -502,6 +550,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("testInsertRecords failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException | RuntimeException e) {
         putBack(session);
@@ -526,6 +575,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("testInsertRecords failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException | RuntimeException e) {
         putBack(session);
@@ -548,6 +598,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("testInsertRecord failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -571,6 +622,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("testInsertRecord failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException | RuntimeException e) {
         putBack(session);
@@ -594,6 +646,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("deleteTimeseries failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -617,6 +670,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("deleteTimeseries failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -641,6 +695,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("deleteData failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -665,6 +720,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("deleteData failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -683,6 +739,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("setStorageGroup failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -701,6 +758,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("deleteStorageGroup failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -719,6 +777,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("deleteStorageGroups failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -737,6 +796,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("createTimeseries failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -758,6 +818,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("createTimeseries failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -780,6 +841,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("createMultiTimeseries failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -798,6 +860,7 @@ public class SessionPool {
         return resp;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("checkTimeseriesExists failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -828,6 +891,7 @@ public class SessionPool {
         return wrapper;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("executeQueryStatement failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
@@ -853,6 +917,7 @@ public class SessionPool {
         return;
       } catch (IoTDBConnectionException e) {
         // TException means the connection is broken, remove it and get a new one.
+        logger.warn("executeNonQueryStatement failed", e);
         cleanSessionAndMayThrowConnectionException(session, i, e);
       } catch (StatementExecutionException e) {
         putBack(session);
diff --git a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
index 9076d19..fbca8e4 100644
--- a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
+++ b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -237,4 +237,11 @@ public class SessionPoolTest {
     }
   }
 
+  @Test
+  public void testClose() {
+    SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 60000, false, null);
+    write10Data(pool, true);
+
+  }
+
 }
\ No newline at end of file