You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2020/03/03 17:56:13 UTC

[GitHub] [incubator-iotdb] jixuan1989 opened a new pull request #880: [IOTDB-538]add a simple connection pool for session api

jixuan1989 opened a new pull request #880: [IOTDB-538]add a simple connection pool for session api
URL: https://github.com/apache/incubator-iotdb/pull/880
 
 
   Well, I think the pool is more easy to use than the description in IOTDB-538.
   
   We provided a connection pool (`SessionPool) for Native API.
   Using the interface, you need to define the pool size.
   If you can not get a session connection in 60 secondes, there is a warning log but the program will hang.
   If a session has finished an operation, it will be put back to the pool automatically.
   If a session connection is broken, the session will be removed automatically and the pool will try 
   to create a new session and redo the operation.
   For query operations:
   1. When using SessionPool to query data, the result set is `SessionDataSetWrapper`;
   2. Given a `SessionDataSetWrapper`, if you have not scanned all the data in it and stop to use it,
   you have to call `SessionPool.closeResultSet(wrapper)` manually;
   3. When you call `hasNext()` and `next()` of a `SessionDataSetWrapper` and there is an exception, then
   you have to call `SessionPool.closeResultSet(wrapper)` manually;
   Examples: ```session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] sonarcloud[bot] removed a comment on issue #880: [IOTDB-538]add a simple connection pool for session api

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on issue #880: [IOTDB-538]add a simple connection pool for session api
URL: https://github.com/apache/incubator-iotdb/pull/880#issuecomment-594253440
 
 
   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL) [3 Code Smells](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3.png' alt='2.5%' width='16' height='16' />](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880&metric=new_duplicated_lines_density&view=list) [2.5% Duplication](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880&metric=new_duplicated_lines_density&view=list)
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] jixuan1989 commented on issue #880: [IOTDB-538]add a simple connection pool for session api

Posted by GitBox <gi...@apache.org>.
jixuan1989 commented on issue #880: [IOTDB-538]add a simple connection pool for session api
URL: https://github.com/apache/incubator-iotdb/pull/880#issuecomment-594250023
 
 
   I need to add a new api to enable user closing a SessionPool.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] sonarcloud[bot] commented on issue #880: [IOTDB-538]add a simple connection pool for session api

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on issue #880: [IOTDB-538]add a simple connection pool for session api
URL: https://github.com/apache/incubator-iotdb/pull/880#issuecomment-598536299
 
 
   SonarCloud Quality Gate failed.
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/B.png' alt='B' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG) [1 Bug](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL) [4 Code Smells](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3.png' alt='0.0%' width='16' height='16' />](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880&metric=new_duplicated_lines_density&view=list)
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] sonarcloud[bot] commented on issue #880: [IOTDB-538]add a simple connection pool for session api

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on issue #880: [IOTDB-538]add a simple connection pool for session api
URL: https://github.com/apache/incubator-iotdb/pull/880#issuecomment-598997733
 
 
   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL) [4 Code Smells](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3.png' alt='0.0%' width='16' height='16' />](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880&metric=new_duplicated_lines_density&view=list)
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] sonarcloud[bot] commented on issue #880: [IOTDB-538]add a simple connection pool for session api

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on issue #880: [IOTDB-538]add a simple connection pool for session api
URL: https://github.com/apache/incubator-iotdb/pull/880#issuecomment-594969744
 
 
   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL) [3 Code Smells](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3.png' alt='2.4%' width='16' height='16' />](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880&metric=new_duplicated_lines_density&view=list) [2.4% Duplication](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880&metric=new_duplicated_lines_density&view=list)
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #880: [IOTDB-538]add a simple connection pool for session api

Posted by GitBox <gi...@apache.org>.
jixuan1989 commented on a change in pull request #880: [IOTDB-538]add a simple connection pool for session api
URL: https://github.com/apache/incubator-iotdb/pull/880#discussion_r389237833
 
 

 ##########
 File path: session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
 ##########
 @@ -0,0 +1,670 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.pool;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.session.Config;
+import org.apache.iotdb.session.IoTDBSessionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.record.RowBatch;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SessionPool is a wrapper of a Session Set.
+ * Using SessionPool, the user do not need to consider how to reuse a session connection.
+ * Even if the session is disconnected, the session pool can recognize it and remove the borken
+ * session connection and create a new one.
+ *
+ * If there is no available connections and the pool reaches its max size, the all methods will hang
+ * until there is a available connection.
+ *
+ * If a user has waited for a session for more than 60 seconds, a warn log will be printed.
+ *
+ *
+ * The only thing you have to remember is that:
+ *
+ * For a query, if you have get all data, i.e., SessionDataSetWrapper.hasNext() == false, it is ok.
+ * Otherwise, i.e., you want to stop the query before you get all data (SessionDataSetWrapper.hasNext() == true),
+ * then you have to call closeResultSet(SessionDataSetWrapper wrapper) manually.
+ * Otherwise the connection is occupied by the query.
+ *
+ * Another case that you have to manually call closeResultSet() is that when there is exception
+ * when you call SessionDataSetWrapper.hasNext() or next()
+ *
+ *
+ */
+public class SessionPool {
+
+  private static final Logger logger = LoggerFactory.getLogger(SessionPool.class);
+  private ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
+  //for session whose resultSet is not released.
+  private ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
+
+
+  public SessionPool(String ip, int port, String user, String password, int maxSize) {
+    this(ip, port, user, password, maxSize, Config.DEFAULT_FETCH_SIZE);
+  }
+
+  public SessionPool(String ip, int port, String user, String password, int maxSize, int fetchSize) {
+    this.maxSize = maxSize;
+    this.ip = ip;
+    this.port = port;
+    this.user = user;
+    this.password = password;
+    this.fetchSize = fetchSize;
+  }
+
+  private int size = 0;
+  private int maxSize = 0;
+  private String ip;
+  private int port;
+  private String user;
+  private String password;
+
+  private int fetchSize;
+
+  //if this method throws an exception, either the server is broken, or the ip/port/user/password is incorrect.
+  //TODO: we can add a mechanism that if the user waits too long time, throw exception.
+  private Session getSession() throws IoTDBSessionException {
+    Session session = queue.poll();
+    if (session != null) {
+      return session;
+    } else {
+      synchronized (this) {
+        long start = System.currentTimeMillis();
+        while (session == null) {
+          if (size < maxSize) {
+            //we can create more session
+            size++;
+            //but we do it after skip synchronized block because connection a session is time consuming.
+            break;
+          } else {
+            //we have to wait for someone returns a session.
+            try {
+              this.wait(1000);
+              if (System.currentTimeMillis() - start > 60_000) {
+                logger.warn(
+                    "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
+                    (System.currentTimeMillis() - start) / 1000, ip, port, user, password);
+              }
+            } catch (InterruptedException e) {
+              logger.error("the SessionPool is damaged", e);
+              Thread.currentThread().interrupt();
+            }
+            session = queue.poll();
+          }
+        }
+        if (session != null) {
+          return session;
+        }
+      }
+      if (logger.isDebugEnabled()) {
+        logger.error("Create a new Session {}, {}, {}, {}", ip, port, user, password);
+      }
+      session = new Session(ip, port, user, password, fetchSize);
+      session.open();
+      return session;
+    }
+  }
+
+  public int currentAvailableSize() {
+    return queue.size();
+  }
+
+  public int currentOccupiedSize() {
+    return occupied.size();
+  }
+
+  private void putBack(Session session) {
+    queue.push(session);
+    synchronized (this) {
+      this.notifyAll();
+    }
+  }
+
+  private void occupy(Session session) {
+    occupied.put(session, session);
+  }
+
+  /**
+   * close all connections in the pool
+   */
+  public synchronized void close() {
+    for (Session session : queue) {
+      try {
+        session.close();
+      } catch (IoTDBSessionException e) {
+        //do nothing
+      }
+    }
+    for (Session session : occupied.keySet()) {
+      try {
+        session.close();
+      } catch (IoTDBSessionException e) {
+        //do nothing
+      }
+    }
+    queue.clear();
+    occupied.clear();
+  }
+
+  public void closeResultSet(SessionDataSetWrapper wrapper) throws SQLException {
+    boolean putback = true;
+    try {
+      wrapper.sessionDataSet.closeOperationHandle();
+    } catch (SQLException e) {
+      if (e.getCause() instanceof TException) {
+        // the connection is broken.
+        removeSession();
+        putback = false;
+      } else {
+        throw e;
+      }
+    } finally {
+      Session session = occupied.remove(wrapper.session);
+      if (putback && session != null) {
+        putBack(wrapper.session);
+      }
+    }
+  }
+
+  private synchronized void removeSession() {
+    if (logger.isDebugEnabled()) {
+      logger.error("Remove a broken Session {}, {}, {}, {}", ip, port, user, password);
+    }
+    size--;
+  }
+
+  private void closeSession(Session session) {
+    if (session != null) {
+      try {
+        session.close();
+      } catch (Exception e2) {
+        //do nothing. We just want to guarantee the session is closed.
+      }
+    }
+  }
+
+  /**
+   * use batch interface to insert sorted data times in row batch must be sorted before!
+   *
+   * @param rowBatch data batch
+   */
+  public TSExecuteBatchStatementResp insertSortedBatch(RowBatch rowBatch)
 
 Review comment:
   >  the codes are double...
   Ah?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] sonarcloud[bot] removed a comment on issue #880: [IOTDB-538]add a simple connection pool for session api

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on issue #880: [IOTDB-538]add a simple connection pool for session api
URL: https://github.com/apache/incubator-iotdb/pull/880#issuecomment-595006088
 
 
   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL) [3 Code Smells](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3.png' alt='2.4%' width='16' height='16' />](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880&metric=new_duplicated_lines_density&view=list) [2.4% Duplication](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880&metric=new_duplicated_lines_density&view=list)
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] qiaojialin commented on issue #880: [IOTDB-538]add a simple connection pool for session api

Posted by GitBox <gi...@apache.org>.
qiaojialin commented on issue #880: [IOTDB-538]add a simple connection pool for session api
URL: https://github.com/apache/incubator-iotdb/pull/880#issuecomment-599007638
 
 
   The Session pool is already merged into rel/0.9 branch. If you want to use it in 0.9.x, rel/0.9 branch is the best choice

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #880: [IOTDB-538]add a simple connection pool for session api

Posted by GitBox <gi...@apache.org>.
qiaojialin commented on a change in pull request #880: [IOTDB-538]add a simple connection pool for session api
URL: https://github.com/apache/incubator-iotdb/pull/880#discussion_r390789834
 
 

 ##########
 File path: session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
 ##########
 @@ -0,0 +1,670 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.pool;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.session.Config;
+import org.apache.iotdb.session.IoTDBSessionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.record.RowBatch;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SessionPool is a wrapper of a Session Set.
+ * Using SessionPool, the user do not need to consider how to reuse a session connection.
+ * Even if the session is disconnected, the session pool can recognize it and remove the borken
+ * session connection and create a new one.
+ *
+ * If there is no available connections and the pool reaches its max size, the all methods will hang
+ * until there is a available connection.
+ *
+ * If a user has waited for a session for more than 60 seconds, a warn log will be printed.
+ *
+ *
+ * The only thing you have to remember is that:
+ *
+ * For a query, if you have get all data, i.e., SessionDataSetWrapper.hasNext() == false, it is ok.
+ * Otherwise, i.e., you want to stop the query before you get all data (SessionDataSetWrapper.hasNext() == true),
+ * then you have to call closeResultSet(SessionDataSetWrapper wrapper) manually.
+ * Otherwise the connection is occupied by the query.
+ *
+ * Another case that you have to manually call closeResultSet() is that when there is exception
+ * when you call SessionDataSetWrapper.hasNext() or next()
+ *
+ *
+ */
+public class SessionPool {
+
+  private static final Logger logger = LoggerFactory.getLogger(SessionPool.class);
+  private ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
+  //for session whose resultSet is not released.
+  private ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
+
+
+  public SessionPool(String ip, int port, String user, String password, int maxSize) {
+    this(ip, port, user, password, maxSize, Config.DEFAULT_FETCH_SIZE);
+  }
+
+  public SessionPool(String ip, int port, String user, String password, int maxSize, int fetchSize) {
+    this.maxSize = maxSize;
+    this.ip = ip;
+    this.port = port;
+    this.user = user;
+    this.password = password;
+    this.fetchSize = fetchSize;
+  }
+
+  private int size = 0;
+  private int maxSize = 0;
+  private String ip;
+  private int port;
+  private String user;
+  private String password;
+
+  private int fetchSize;
+
+  //if this method throws an exception, either the server is broken, or the ip/port/user/password is incorrect.
+  //TODO: we can add a mechanism that if the user waits too long time, throw exception.
+  private Session getSession() throws IoTDBSessionException {
+    Session session = queue.poll();
+    if (session != null) {
+      return session;
+    } else {
+      synchronized (this) {
+        long start = System.currentTimeMillis();
+        while (session == null) {
+          if (size < maxSize) {
+            //we can create more session
+            size++;
+            //but we do it after skip synchronized block because connection a session is time consuming.
+            break;
+          } else {
+            //we have to wait for someone returns a session.
+            try {
+              this.wait(1000);
+              if (System.currentTimeMillis() - start > 60_000) {
+                logger.warn(
+                    "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
+                    (System.currentTimeMillis() - start) / 1000, ip, port, user, password);
+              }
+            } catch (InterruptedException e) {
+              logger.error("the SessionPool is damaged", e);
+              Thread.currentThread().interrupt();
+            }
+            session = queue.poll();
+          }
+        }
+        if (session != null) {
+          return session;
+        }
+      }
+      if (logger.isDebugEnabled()) {
+        logger.error("Create a new Session {}, {}, {}, {}", ip, port, user, password);
+      }
+      session = new Session(ip, port, user, password, fetchSize);
+      session.open();
+      return session;
+    }
+  }
+
+  public int currentAvailableSize() {
+    return queue.size();
+  }
+
+  public int currentOccupiedSize() {
+    return occupied.size();
+  }
+
+  private void putBack(Session session) {
+    queue.push(session);
+    synchronized (this) {
+      this.notifyAll();
+    }
+  }
+
+  private void occupy(Session session) {
+    occupied.put(session, session);
+  }
+
+  /**
+   * close all connections in the pool
+   */
+  public synchronized void close() {
+    for (Session session : queue) {
+      try {
+        session.close();
+      } catch (IoTDBSessionException e) {
+        //do nothing
+      }
+    }
+    for (Session session : occupied.keySet()) {
+      try {
+        session.close();
+      } catch (IoTDBSessionException e) {
+        //do nothing
+      }
+    }
+    queue.clear();
+    occupied.clear();
+  }
+
+  public void closeResultSet(SessionDataSetWrapper wrapper) throws SQLException {
+    boolean putback = true;
+    try {
+      wrapper.sessionDataSet.closeOperationHandle();
+    } catch (SQLException e) {
+      if (e.getCause() instanceof TException) {
+        // the connection is broken.
+        removeSession();
+        putback = false;
+      } else {
+        throw e;
+      }
+    } finally {
+      Session session = occupied.remove(wrapper.session);
+      if (putback && session != null) {
+        putBack(wrapper.session);
+      }
+    }
+  }
+
+  private synchronized void removeSession() {
+    if (logger.isDebugEnabled()) {
+      logger.error("Remove a broken Session {}, {}, {}, {}", ip, port, user, password);
+    }
+    size--;
+  }
+
+  private void closeSession(Session session) {
+    if (session != null) {
+      try {
+        session.close();
+      } catch (Exception e2) {
+        //do nothing. We just want to guarantee the session is closed.
+      }
+    }
+  }
+
+  /**
+   * use batch interface to insert sorted data times in row batch must be sorted before!
+   *
+   * @param rowBatch data batch
+   */
+  public TSExecuteBatchStatementResp insertSortedBatch(RowBatch rowBatch)
 
 Review comment:
   the interfaces are double...

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] sonarcloud[bot] commented on issue #880: [IOTDB-538]add a simple connection pool for session api

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on issue #880: [IOTDB-538]add a simple connection pool for session api
URL: https://github.com/apache/incubator-iotdb/pull/880#issuecomment-594253440
 
 
   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL) [3 Code Smells](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3.png' alt='2.5%' width='16' height='16' />](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880&metric=new_duplicated_lines_density&view=list) [2.5% Duplication](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880&metric=new_duplicated_lines_density&view=list)
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #880: [IOTDB-538]add a simple connection pool for session api

Posted by GitBox <gi...@apache.org>.
jixuan1989 commented on a change in pull request #880: [IOTDB-538]add a simple connection pool for session api
URL: https://github.com/apache/incubator-iotdb/pull/880#discussion_r389237833
 
 

 ##########
 File path: session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
 ##########
 @@ -0,0 +1,670 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.pool;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.session.Config;
+import org.apache.iotdb.session.IoTDBSessionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.record.RowBatch;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SessionPool is a wrapper of a Session Set.
+ * Using SessionPool, the user do not need to consider how to reuse a session connection.
+ * Even if the session is disconnected, the session pool can recognize it and remove the borken
+ * session connection and create a new one.
+ *
+ * If there is no available connections and the pool reaches its max size, the all methods will hang
+ * until there is a available connection.
+ *
+ * If a user has waited for a session for more than 60 seconds, a warn log will be printed.
+ *
+ *
+ * The only thing you have to remember is that:
+ *
+ * For a query, if you have get all data, i.e., SessionDataSetWrapper.hasNext() == false, it is ok.
+ * Otherwise, i.e., you want to stop the query before you get all data (SessionDataSetWrapper.hasNext() == true),
+ * then you have to call closeResultSet(SessionDataSetWrapper wrapper) manually.
+ * Otherwise the connection is occupied by the query.
+ *
+ * Another case that you have to manually call closeResultSet() is that when there is exception
+ * when you call SessionDataSetWrapper.hasNext() or next()
+ *
+ *
+ */
+public class SessionPool {
+
+  private static final Logger logger = LoggerFactory.getLogger(SessionPool.class);
+  private ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
+  //for session whose resultSet is not released.
+  private ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
+
+
+  public SessionPool(String ip, int port, String user, String password, int maxSize) {
+    this(ip, port, user, password, maxSize, Config.DEFAULT_FETCH_SIZE);
+  }
+
+  public SessionPool(String ip, int port, String user, String password, int maxSize, int fetchSize) {
+    this.maxSize = maxSize;
+    this.ip = ip;
+    this.port = port;
+    this.user = user;
+    this.password = password;
+    this.fetchSize = fetchSize;
+  }
+
+  private int size = 0;
+  private int maxSize = 0;
+  private String ip;
+  private int port;
+  private String user;
+  private String password;
+
+  private int fetchSize;
+
+  //if this method throws an exception, either the server is broken, or the ip/port/user/password is incorrect.
+  //TODO: we can add a mechanism that if the user waits too long time, throw exception.
+  private Session getSession() throws IoTDBSessionException {
+    Session session = queue.poll();
+    if (session != null) {
+      return session;
+    } else {
+      synchronized (this) {
+        long start = System.currentTimeMillis();
+        while (session == null) {
+          if (size < maxSize) {
+            //we can create more session
+            size++;
+            //but we do it after skip synchronized block because connection a session is time consuming.
+            break;
+          } else {
+            //we have to wait for someone returns a session.
+            try {
+              this.wait(1000);
+              if (System.currentTimeMillis() - start > 60_000) {
+                logger.warn(
+                    "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
+                    (System.currentTimeMillis() - start) / 1000, ip, port, user, password);
+              }
+            } catch (InterruptedException e) {
+              logger.error("the SessionPool is damaged", e);
+              Thread.currentThread().interrupt();
+            }
+            session = queue.poll();
+          }
+        }
+        if (session != null) {
+          return session;
+        }
+      }
+      if (logger.isDebugEnabled()) {
+        logger.error("Create a new Session {}, {}, {}, {}", ip, port, user, password);
+      }
+      session = new Session(ip, port, user, password, fetchSize);
+      session.open();
+      return session;
+    }
+  }
+
+  public int currentAvailableSize() {
+    return queue.size();
+  }
+
+  public int currentOccupiedSize() {
+    return occupied.size();
+  }
+
+  private void putBack(Session session) {
+    queue.push(session);
+    synchronized (this) {
+      this.notifyAll();
+    }
+  }
+
+  private void occupy(Session session) {
+    occupied.put(session, session);
+  }
+
+  /**
+   * close all connections in the pool
+   */
+  public synchronized void close() {
+    for (Session session : queue) {
+      try {
+        session.close();
+      } catch (IoTDBSessionException e) {
+        //do nothing
+      }
+    }
+    for (Session session : occupied.keySet()) {
+      try {
+        session.close();
+      } catch (IoTDBSessionException e) {
+        //do nothing
+      }
+    }
+    queue.clear();
+    occupied.clear();
+  }
+
+  public void closeResultSet(SessionDataSetWrapper wrapper) throws SQLException {
+    boolean putback = true;
+    try {
+      wrapper.sessionDataSet.closeOperationHandle();
+    } catch (SQLException e) {
+      if (e.getCause() instanceof TException) {
+        // the connection is broken.
+        removeSession();
+        putback = false;
+      } else {
+        throw e;
+      }
+    } finally {
+      Session session = occupied.remove(wrapper.session);
+      if (putback && session != null) {
+        putBack(wrapper.session);
+      }
+    }
+  }
+
+  private synchronized void removeSession() {
+    if (logger.isDebugEnabled()) {
+      logger.error("Remove a broken Session {}, {}, {}, {}", ip, port, user, password);
+    }
+    size--;
+  }
+
+  private void closeSession(Session session) {
+    if (session != null) {
+      try {
+        session.close();
+      } catch (Exception e2) {
+        //do nothing. We just want to guarantee the session is closed.
+      }
+    }
+  }
+
+  /**
+   * use batch interface to insert sorted data times in row batch must be sorted before!
+   *
+   * @param rowBatch data batch
+   */
+  public TSExecuteBatchStatementResp insertSortedBatch(RowBatch rowBatch)
 
 Review comment:
   >  the codes are double...
   
   Ah?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] jixuan1989 commented on issue #880: [IOTDB-538]add a simple connection pool for session api

Posted by GitBox <gi...@apache.org>.
jixuan1989 commented on issue #880: [IOTDB-538]add a simple connection pool for session api
URL: https://github.com/apache/incubator-iotdb/pull/880#issuecomment-598523081
 
 
   The attachment is a patch for v0.9.1. 
   just use `git apply --check THE FILE` to check it is ok and then use `git apply THE FILE` to use it.
   [session_pool_0.9.1.patch.txt](https://github.com/apache/incubator-iotdb/files/4327592/session_pool_0.9.1.patch.txt)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] sonarcloud[bot] commented on issue #880: [IOTDB-538]add a simple connection pool for session api

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on issue #880: [IOTDB-538]add a simple connection pool for session api
URL: https://github.com/apache/incubator-iotdb/pull/880#issuecomment-595006088
 
 
   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL) [3 Code Smells](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3.png' alt='2.4%' width='16' height='16' />](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880&metric=new_duplicated_lines_density&view=list) [2.4% Duplication](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880&metric=new_duplicated_lines_density&view=list)
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #880: [IOTDB-538]add a simple connection pool for session api

Posted by GitBox <gi...@apache.org>.
qiaojialin commented on a change in pull request #880: [IOTDB-538]add a simple connection pool for session api
URL: https://github.com/apache/incubator-iotdb/pull/880#discussion_r388055708
 
 

 ##########
 File path: session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
 ##########
 @@ -0,0 +1,670 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.pool;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.session.Config;
+import org.apache.iotdb.session.IoTDBSessionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.record.RowBatch;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SessionPool is a wrapper of a Session Set.
+ * Using SessionPool, the user do not need to consider how to reuse a session connection.
+ * Even if the session is disconnected, the session pool can recognize it and remove the borken
+ * session connection and create a new one.
+ *
+ * If there is no available connections and the pool reaches its max size, the all methods will hang
+ * until there is a available connection.
+ *
+ * If a user has waited for a session for more than 60 seconds, a warn log will be printed.
+ *
+ *
+ * The only thing you have to remember is that:
+ *
+ * For a query, if you have get all data, i.e., SessionDataSetWrapper.hasNext() == false, it is ok.
+ * Otherwise, i.e., you want to stop the query before you get all data (SessionDataSetWrapper.hasNext() == true),
+ * then you have to call closeResultSet(SessionDataSetWrapper wrapper) manually.
+ * Otherwise the connection is occupied by the query.
+ *
+ * Another case that you have to manually call closeResultSet() is that when there is exception
+ * when you call SessionDataSetWrapper.hasNext() or next()
+ *
+ *
+ */
+public class SessionPool {
+
+  private static final Logger logger = LoggerFactory.getLogger(SessionPool.class);
+  private ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
+  //for session whose resultSet is not released.
+  private ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
+
+
+  public SessionPool(String ip, int port, String user, String password, int maxSize) {
+    this(ip, port, user, password, maxSize, Config.DEFAULT_FETCH_SIZE);
+  }
+
+  public SessionPool(String ip, int port, String user, String password, int maxSize, int fetchSize) {
+    this.maxSize = maxSize;
+    this.ip = ip;
+    this.port = port;
+    this.user = user;
+    this.password = password;
+    this.fetchSize = fetchSize;
+  }
+
+  private int size = 0;
+  private int maxSize = 0;
+  private String ip;
+  private int port;
+  private String user;
+  private String password;
+
+  private int fetchSize;
+
+  //if this method throws an exception, either the server is broken, or the ip/port/user/password is incorrect.
+  //TODO: we can add a mechanism that if the user waits too long time, throw exception.
+  private Session getSession() throws IoTDBSessionException {
+    Session session = queue.poll();
+    if (session != null) {
+      return session;
+    } else {
+      synchronized (this) {
+        long start = System.currentTimeMillis();
+        while (session == null) {
+          if (size < maxSize) {
+            //we can create more session
+            size++;
+            //but we do it after skip synchronized block because connection a session is time consuming.
+            break;
+          } else {
+            //we have to wait for someone returns a session.
+            try {
+              this.wait(1000);
+              if (System.currentTimeMillis() - start > 60_000) {
+                logger.warn(
+                    "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
+                    (System.currentTimeMillis() - start) / 1000, ip, port, user, password);
+              }
+            } catch (InterruptedException e) {
+              logger.error("the SessionPool is damaged", e);
+              Thread.currentThread().interrupt();
+            }
+            session = queue.poll();
+          }
+        }
+        if (session != null) {
+          return session;
+        }
+      }
+      if (logger.isDebugEnabled()) {
+        logger.error("Create a new Session {}, {}, {}, {}", ip, port, user, password);
 
 Review comment:
   should the password be printed?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] jixuan1989 commented on a change in pull request #880: [IOTDB-538]add a simple connection pool for session api

Posted by GitBox <gi...@apache.org>.
jixuan1989 commented on a change in pull request #880: [IOTDB-538]add a simple connection pool for session api
URL: https://github.com/apache/incubator-iotdb/pull/880#discussion_r389237814
 
 

 ##########
 File path: session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
 ##########
 @@ -0,0 +1,670 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.pool;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.session.Config;
+import org.apache.iotdb.session.IoTDBSessionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.record.RowBatch;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SessionPool is a wrapper of a Session Set.
+ * Using SessionPool, the user do not need to consider how to reuse a session connection.
+ * Even if the session is disconnected, the session pool can recognize it and remove the borken
+ * session connection and create a new one.
+ *
+ * If there is no available connections and the pool reaches its max size, the all methods will hang
+ * until there is a available connection.
+ *
+ * If a user has waited for a session for more than 60 seconds, a warn log will be printed.
+ *
+ *
+ * The only thing you have to remember is that:
+ *
+ * For a query, if you have get all data, i.e., SessionDataSetWrapper.hasNext() == false, it is ok.
+ * Otherwise, i.e., you want to stop the query before you get all data (SessionDataSetWrapper.hasNext() == true),
+ * then you have to call closeResultSet(SessionDataSetWrapper wrapper) manually.
+ * Otherwise the connection is occupied by the query.
+ *
+ * Another case that you have to manually call closeResultSet() is that when there is exception
+ * when you call SessionDataSetWrapper.hasNext() or next()
+ *
+ *
+ */
+public class SessionPool {
+
+  private static final Logger logger = LoggerFactory.getLogger(SessionPool.class);
+  private ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
+  //for session whose resultSet is not released.
+  private ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
+
+
+  public SessionPool(String ip, int port, String user, String password, int maxSize) {
+    this(ip, port, user, password, maxSize, Config.DEFAULT_FETCH_SIZE);
+  }
+
+  public SessionPool(String ip, int port, String user, String password, int maxSize, int fetchSize) {
+    this.maxSize = maxSize;
+    this.ip = ip;
+    this.port = port;
+    this.user = user;
+    this.password = password;
+    this.fetchSize = fetchSize;
+  }
+
+  private int size = 0;
+  private int maxSize = 0;
+  private String ip;
+  private int port;
+  private String user;
+  private String password;
+
+  private int fetchSize;
+
+  //if this method throws an exception, either the server is broken, or the ip/port/user/password is incorrect.
+  //TODO: we can add a mechanism that if the user waits too long time, throw exception.
+  private Session getSession() throws IoTDBSessionException {
+    Session session = queue.poll();
+    if (session != null) {
+      return session;
+    } else {
+      synchronized (this) {
+        long start = System.currentTimeMillis();
+        while (session == null) {
+          if (size < maxSize) {
+            //we can create more session
+            size++;
+            //but we do it after skip synchronized block because connection a session is time consuming.
+            break;
+          } else {
+            //we have to wait for someone returns a session.
+            try {
+              this.wait(1000);
+              if (System.currentTimeMillis() - start > 60_000) {
+                logger.warn(
+                    "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
+                    (System.currentTimeMillis() - start) / 1000, ip, port, user, password);
+              }
+            } catch (InterruptedException e) {
+              logger.error("the SessionPool is damaged", e);
+              Thread.currentThread().interrupt();
+            }
+            session = queue.poll();
+          }
+        }
+        if (session != null) {
+          return session;
+        }
+      }
+      if (logger.isDebugEnabled()) {
+        logger.error("Create a new Session {}, {}, {}, {}", ip, port, user, password);
+      }
+      session = new Session(ip, port, user, password, fetchSize);
+      session.open();
+      return session;
+    }
+  }
+
+  public int currentAvailableSize() {
+    return queue.size();
+  }
+
+  public int currentOccupiedSize() {
+    return occupied.size();
+  }
+
+  private void putBack(Session session) {
+    queue.push(session);
+    synchronized (this) {
+      this.notifyAll();
+    }
+  }
+
+  private void occupy(Session session) {
+    occupied.put(session, session);
+  }
+
+  /**
+   * close all connections in the pool
+   */
+  public synchronized void close() {
+    for (Session session : queue) {
+      try {
+        session.close();
+      } catch (IoTDBSessionException e) {
+        //do nothing
+      }
+    }
+    for (Session session : occupied.keySet()) {
+      try {
+        session.close();
+      } catch (IoTDBSessionException e) {
+        //do nothing
+      }
+    }
+    queue.clear();
+    occupied.clear();
+  }
+
+  public void closeResultSet(SessionDataSetWrapper wrapper) throws SQLException {
+    boolean putback = true;
+    try {
+      wrapper.sessionDataSet.closeOperationHandle();
+    } catch (SQLException e) {
+      if (e.getCause() instanceof TException) {
+        // the connection is broken.
+        removeSession();
+        putback = false;
+      } else {
+        throw e;
+      }
+    } finally {
+      Session session = occupied.remove(wrapper.session);
+      if (putback && session != null) {
+        putBack(wrapper.session);
+      }
+    }
+  }
+
+  private synchronized void removeSession() {
+    if (logger.isDebugEnabled()) {
+      logger.error("Remove a broken Session {}, {}, {}, {}", ip, port, user, password);
+    }
+    size--;
+  }
+
+  private void closeSession(Session session) {
+    if (session != null) {
+      try {
+        session.close();
+      } catch (Exception e2) {
+        //do nothing. We just want to guarantee the session is closed.
+      }
+    }
+  }
+
+  /**
+   * use batch interface to insert sorted data times in row batch must be sorted before!
+   *
+   * @param rowBatch data batch
+   */
+  public TSExecuteBatchStatementResp insertSortedBatch(RowBatch rowBatch)
+      throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSExecuteBatchStatementResp resp = session.insertSortedBatch(rowBatch);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+
+
+  /**
+   * use batch interface to insert data
+   *
+   * @param rowBatch data batch
+   */
+  public TSExecuteBatchStatementResp insertBatch(RowBatch rowBatch) throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSExecuteBatchStatementResp resp = session.insertBatch(rowBatch);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
 
 Review comment:
   will fix.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #880: [IOTDB-538]add a simple connection pool for session api

Posted by GitBox <gi...@apache.org>.
qiaojialin commented on a change in pull request #880: [IOTDB-538]add a simple connection pool for session api
URL: https://github.com/apache/incubator-iotdb/pull/880#discussion_r388056083
 
 

 ##########
 File path: session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
 ##########
 @@ -0,0 +1,670 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.pool;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.session.Config;
+import org.apache.iotdb.session.IoTDBSessionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.record.RowBatch;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SessionPool is a wrapper of a Session Set.
+ * Using SessionPool, the user do not need to consider how to reuse a session connection.
+ * Even if the session is disconnected, the session pool can recognize it and remove the borken
+ * session connection and create a new one.
+ *
+ * If there is no available connections and the pool reaches its max size, the all methods will hang
+ * until there is a available connection.
+ *
+ * If a user has waited for a session for more than 60 seconds, a warn log will be printed.
+ *
+ *
+ * The only thing you have to remember is that:
+ *
+ * For a query, if you have get all data, i.e., SessionDataSetWrapper.hasNext() == false, it is ok.
+ * Otherwise, i.e., you want to stop the query before you get all data (SessionDataSetWrapper.hasNext() == true),
+ * then you have to call closeResultSet(SessionDataSetWrapper wrapper) manually.
+ * Otherwise the connection is occupied by the query.
+ *
+ * Another case that you have to manually call closeResultSet() is that when there is exception
+ * when you call SessionDataSetWrapper.hasNext() or next()
+ *
+ *
+ */
+public class SessionPool {
+
+  private static final Logger logger = LoggerFactory.getLogger(SessionPool.class);
+  private ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
+  //for session whose resultSet is not released.
+  private ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
+
+
+  public SessionPool(String ip, int port, String user, String password, int maxSize) {
+    this(ip, port, user, password, maxSize, Config.DEFAULT_FETCH_SIZE);
+  }
+
+  public SessionPool(String ip, int port, String user, String password, int maxSize, int fetchSize) {
+    this.maxSize = maxSize;
+    this.ip = ip;
+    this.port = port;
+    this.user = user;
+    this.password = password;
+    this.fetchSize = fetchSize;
+  }
+
+  private int size = 0;
+  private int maxSize = 0;
+  private String ip;
+  private int port;
+  private String user;
+  private String password;
+
+  private int fetchSize;
+
+  //if this method throws an exception, either the server is broken, or the ip/port/user/password is incorrect.
+  //TODO: we can add a mechanism that if the user waits too long time, throw exception.
+  private Session getSession() throws IoTDBSessionException {
+    Session session = queue.poll();
+    if (session != null) {
+      return session;
+    } else {
+      synchronized (this) {
+        long start = System.currentTimeMillis();
+        while (session == null) {
+          if (size < maxSize) {
+            //we can create more session
+            size++;
+            //but we do it after skip synchronized block because connection a session is time consuming.
+            break;
+          } else {
+            //we have to wait for someone returns a session.
+            try {
+              this.wait(1000);
+              if (System.currentTimeMillis() - start > 60_000) {
+                logger.warn(
+                    "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
+                    (System.currentTimeMillis() - start) / 1000, ip, port, user, password);
+              }
+            } catch (InterruptedException e) {
+              logger.error("the SessionPool is damaged", e);
+              Thread.currentThread().interrupt();
+            }
+            session = queue.poll();
+          }
+        }
+        if (session != null) {
+          return session;
+        }
+      }
+      if (logger.isDebugEnabled()) {
+        logger.error("Create a new Session {}, {}, {}, {}", ip, port, user, password);
+      }
+      session = new Session(ip, port, user, password, fetchSize);
+      session.open();
+      return session;
+    }
+  }
+
+  public int currentAvailableSize() {
+    return queue.size();
+  }
+
+  public int currentOccupiedSize() {
+    return occupied.size();
+  }
+
+  private void putBack(Session session) {
+    queue.push(session);
+    synchronized (this) {
+      this.notifyAll();
+    }
+  }
+
+  private void occupy(Session session) {
+    occupied.put(session, session);
+  }
+
+  /**
+   * close all connections in the pool
+   */
+  public synchronized void close() {
+    for (Session session : queue) {
+      try {
+        session.close();
+      } catch (IoTDBSessionException e) {
+        //do nothing
+      }
+    }
+    for (Session session : occupied.keySet()) {
+      try {
+        session.close();
+      } catch (IoTDBSessionException e) {
+        //do nothing
+      }
+    }
+    queue.clear();
+    occupied.clear();
+  }
+
+  public void closeResultSet(SessionDataSetWrapper wrapper) throws SQLException {
+    boolean putback = true;
+    try {
+      wrapper.sessionDataSet.closeOperationHandle();
+    } catch (SQLException e) {
+      if (e.getCause() instanceof TException) {
+        // the connection is broken.
+        removeSession();
+        putback = false;
+      } else {
+        throw e;
+      }
+    } finally {
+      Session session = occupied.remove(wrapper.session);
+      if (putback && session != null) {
+        putBack(wrapper.session);
+      }
+    }
+  }
+
+  private synchronized void removeSession() {
+    if (logger.isDebugEnabled()) {
+      logger.error("Remove a broken Session {}, {}, {}, {}", ip, port, user, password);
+    }
+    size--;
+  }
+
+  private void closeSession(Session session) {
+    if (session != null) {
+      try {
+        session.close();
+      } catch (Exception e2) {
+        //do nothing. We just want to guarantee the session is closed.
+      }
+    }
+  }
+
+  /**
+   * use batch interface to insert sorted data times in row batch must be sorted before!
+   *
+   * @param rowBatch data batch
+   */
+  public TSExecuteBatchStatementResp insertSortedBatch(RowBatch rowBatch)
 
 Review comment:
   Is it needed to export the methods of Session in this Pool? the codes are double...

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] sonarcloud[bot] removed a comment on issue #880: [IOTDB-538]add a simple connection pool for session api

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on issue #880: [IOTDB-538]add a simple connection pool for session api
URL: https://github.com/apache/incubator-iotdb/pull/880#issuecomment-598536299
 
 
   SonarCloud Quality Gate failed.
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/B.png' alt='B' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG) [1 Bug](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL) [4 Code Smells](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3.png' alt='0.0%' width='16' height='16' />](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880&metric=new_duplicated_lines_density&view=list)
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] sonarcloud[bot] commented on issue #880: [IOTDB-538]add a simple connection pool for session api

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] commented on issue #880: [IOTDB-538]add a simple connection pool for session api
URL: https://github.com/apache/incubator-iotdb/pull/880#issuecomment-599002320
 
 
   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL) [5 Code Smells](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3.png' alt='0.0%' width='16' height='16' />](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880&metric=new_duplicated_lines_density&view=list)
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] sonarcloud[bot] removed a comment on issue #880: [IOTDB-538]add a simple connection pool for session api

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on issue #880: [IOTDB-538]add a simple connection pool for session api
URL: https://github.com/apache/incubator-iotdb/pull/880#issuecomment-598997733
 
 
   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL) [4 Code Smells](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3.png' alt='0.0%' width='16' height='16' />](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880&metric=new_duplicated_lines_density&view=list) [0.0% Duplication](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880&metric=new_duplicated_lines_density&view=list)
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] sonarcloud[bot] removed a comment on issue #880: [IOTDB-538]add a simple connection pool for session api

Posted by GitBox <gi...@apache.org>.
sonarcloud[bot] removed a comment on issue #880: [IOTDB-538]add a simple connection pool for session api
URL: https://github.com/apache/incubator-iotdb/pull/880#issuecomment-594969744
 
 
   Kudos, SonarCloud Quality Gate passed!
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/bug.png' alt='Bug' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG) [0 Bugs](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=BUG)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/vulnerability.png' alt='Vulnerability' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) [0 Vulnerabilities](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=VULNERABILITY) (and [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/security_hotspot.png' alt='Security Hotspot' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=SECURITY_HOTSPOT) [0 Security Hotspots](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=SECURITY_HOTSPOT) to review)  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/common/code_smell.png' alt='Code Smell' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL) [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/RatingBadge/A.png' alt='A' width='16' height='16' />](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL) [3 Code Smells](https://sonarcloud.io/project/issues?id=apache_incubator-iotdb&pullRequest=880&resolved=false&types=CODE_SMELL)
   
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/CoverageChart/NoCoverageInfo.png' alt='No Coverage information' width='16' height='16' />](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880) No Coverage information  
   [<img src='https://sonarsource.github.io/sonarcloud-github-static-resources/v2/checks/Duplications/3.png' alt='2.4%' width='16' height='16' />](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880&metric=new_duplicated_lines_density&view=list) [2.4% Duplication](https://sonarcloud.io/component_measures?id=apache_incubator-iotdb&pullRequest=880&metric=new_duplicated_lines_density&view=list)
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] qiaojialin commented on a change in pull request #880: [IOTDB-538]add a simple connection pool for session api

Posted by GitBox <gi...@apache.org>.
qiaojialin commented on a change in pull request #880: [IOTDB-538]add a simple connection pool for session api
URL: https://github.com/apache/incubator-iotdb/pull/880#discussion_r388056181
 
 

 ##########
 File path: session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
 ##########
 @@ -0,0 +1,670 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.pool;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.session.Config;
+import org.apache.iotdb.session.IoTDBSessionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.record.RowBatch;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SessionPool is a wrapper of a Session Set.
+ * Using SessionPool, the user do not need to consider how to reuse a session connection.
+ * Even if the session is disconnected, the session pool can recognize it and remove the borken
+ * session connection and create a new one.
+ *
+ * If there is no available connections and the pool reaches its max size, the all methods will hang
+ * until there is a available connection.
+ *
+ * If a user has waited for a session for more than 60 seconds, a warn log will be printed.
+ *
+ *
+ * The only thing you have to remember is that:
+ *
+ * For a query, if you have get all data, i.e., SessionDataSetWrapper.hasNext() == false, it is ok.
+ * Otherwise, i.e., you want to stop the query before you get all data (SessionDataSetWrapper.hasNext() == true),
+ * then you have to call closeResultSet(SessionDataSetWrapper wrapper) manually.
+ * Otherwise the connection is occupied by the query.
+ *
+ * Another case that you have to manually call closeResultSet() is that when there is exception
+ * when you call SessionDataSetWrapper.hasNext() or next()
+ *
+ *
+ */
+public class SessionPool {
+
+  private static final Logger logger = LoggerFactory.getLogger(SessionPool.class);
+  private ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
+  //for session whose resultSet is not released.
+  private ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
+
+
+  public SessionPool(String ip, int port, String user, String password, int maxSize) {
+    this(ip, port, user, password, maxSize, Config.DEFAULT_FETCH_SIZE);
+  }
+
+  public SessionPool(String ip, int port, String user, String password, int maxSize, int fetchSize) {
+    this.maxSize = maxSize;
+    this.ip = ip;
+    this.port = port;
+    this.user = user;
+    this.password = password;
+    this.fetchSize = fetchSize;
+  }
+
+  private int size = 0;
+  private int maxSize = 0;
+  private String ip;
+  private int port;
+  private String user;
+  private String password;
+
+  private int fetchSize;
+
+  //if this method throws an exception, either the server is broken, or the ip/port/user/password is incorrect.
+  //TODO: we can add a mechanism that if the user waits too long time, throw exception.
+  private Session getSession() throws IoTDBSessionException {
+    Session session = queue.poll();
+    if (session != null) {
+      return session;
+    } else {
+      synchronized (this) {
+        long start = System.currentTimeMillis();
+        while (session == null) {
+          if (size < maxSize) {
+            //we can create more session
+            size++;
+            //but we do it after skip synchronized block because connection a session is time consuming.
+            break;
+          } else {
+            //we have to wait for someone returns a session.
+            try {
+              this.wait(1000);
+              if (System.currentTimeMillis() - start > 60_000) {
+                logger.warn(
+                    "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
+                    (System.currentTimeMillis() - start) / 1000, ip, port, user, password);
+              }
+            } catch (InterruptedException e) {
+              logger.error("the SessionPool is damaged", e);
+              Thread.currentThread().interrupt();
+            }
+            session = queue.poll();
+          }
+        }
+        if (session != null) {
+          return session;
+        }
+      }
+      if (logger.isDebugEnabled()) {
+        logger.error("Create a new Session {}, {}, {}, {}", ip, port, user, password);
+      }
+      session = new Session(ip, port, user, password, fetchSize);
+      session.open();
+      return session;
+    }
+  }
+
+  public int currentAvailableSize() {
+    return queue.size();
+  }
+
+  public int currentOccupiedSize() {
+    return occupied.size();
+  }
+
+  private void putBack(Session session) {
+    queue.push(session);
+    synchronized (this) {
+      this.notifyAll();
+    }
+  }
+
+  private void occupy(Session session) {
+    occupied.put(session, session);
+  }
+
+  /**
+   * close all connections in the pool
+   */
+  public synchronized void close() {
+    for (Session session : queue) {
+      try {
+        session.close();
+      } catch (IoTDBSessionException e) {
+        //do nothing
+      }
+    }
+    for (Session session : occupied.keySet()) {
+      try {
+        session.close();
+      } catch (IoTDBSessionException e) {
+        //do nothing
+      }
+    }
+    queue.clear();
+    occupied.clear();
+  }
+
+  public void closeResultSet(SessionDataSetWrapper wrapper) throws SQLException {
+    boolean putback = true;
+    try {
+      wrapper.sessionDataSet.closeOperationHandle();
+    } catch (SQLException e) {
+      if (e.getCause() instanceof TException) {
+        // the connection is broken.
+        removeSession();
+        putback = false;
+      } else {
+        throw e;
+      }
+    } finally {
+      Session session = occupied.remove(wrapper.session);
+      if (putback && session != null) {
+        putBack(wrapper.session);
+      }
+    }
+  }
+
+  private synchronized void removeSession() {
+    if (logger.isDebugEnabled()) {
+      logger.error("Remove a broken Session {}, {}, {}, {}", ip, port, user, password);
+    }
+    size--;
+  }
+
+  private void closeSession(Session session) {
+    if (session != null) {
+      try {
+        session.close();
+      } catch (Exception e2) {
+        //do nothing. We just want to guarantee the session is closed.
+      }
+    }
+  }
+
+  /**
+   * use batch interface to insert sorted data times in row batch must be sorted before!
+   *
+   * @param rowBatch data batch
+   */
+  public TSExecuteBatchStatementResp insertSortedBatch(RowBatch rowBatch)
+      throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSExecuteBatchStatementResp resp = session.insertSortedBatch(rowBatch);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+
+
+  /**
+   * use batch interface to insert data
+   *
+   * @param rowBatch data batch
+   */
+  public TSExecuteBatchStatementResp insertBatch(RowBatch rowBatch) throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSExecuteBatchStatementResp resp = session.insertBatch(rowBatch);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
 
 Review comment:
   get a new one?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-iotdb] qiaojialin merged pull request #880: [IOTDB-538]add a simple connection pool for session api

Posted by GitBox <gi...@apache.org>.
qiaojialin merged pull request #880: [IOTDB-538]add a simple connection pool for session api
URL: https://github.com/apache/incubator-iotdb/pull/880
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services