You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/03/14 05:18:32 UTC

[incubator-iotdb] branch master updated: [IOTDB-538]add a simple connection pool for session api (#880)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f9fd91f  [IOTDB-538]add a simple connection pool for session api (#880)
f9fd91f is described below

commit f9fd91f80a53c978e32e26c232f24ad9254c9753
Author: Xiangdong Huang <hx...@qq.com>
AuthorDate: Sat Mar 14 13:18:21 2020 +0800

    [IOTDB-538]add a simple connection pool for session api (#880)
    
    * add a simple connection pool for session api
---
 .../4-Client/2-Programming - Native API.md         |  19 +-
 .../4-Client/2-Programming - Native API.md         |  25 +-
 .../main/java/org/apache/iotdb/SessionExample.java |   1 +
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   9 +
 .../org/apache/iotdb/session/SessionDataSet.java   |  10 +-
 .../iotdb/session/pool/SessionDataSetWrapper.java  |  81 +++
 .../org/apache/iotdb/session/pool/SessionPool.java | 669 +++++++++++++++++++++
 .../apache/iotdb/session/pool/SessionPoolTest.java | 236 ++++++++
 8 files changed, 1047 insertions(+), 3 deletions(-)

diff --git a/docs/Documentation-CHN/UserGuide/4-Client/2-Programming - Native API.md b/docs/Documentation-CHN/UserGuide/4-Client/2-Programming - Native API.md
index 25bb3ee..c434ba0 100644
--- a/docs/Documentation-CHN/UserGuide/4-Client/2-Programming - Native API.md	
+++ b/docs/Documentation-CHN/UserGuide/4-Client/2-Programming - Native API.md	
@@ -100,4 +100,21 @@
 
 浏览上述接口的详细信息,请参阅代码 ```session/src/main/java/org/apache/iotdb/session/Session.java```
 
-使用上述接口的示例代码在 ```example/session/src/main/java/org/apache/iotdb/SessionExample.java```
\ No newline at end of file
+使用上述接口的示例代码在 ```example/session/src/main/java/org/apache/iotdb/SessionExample.java```
+
+# 针对原生接口的连接池
+
+我们提供了一个针对原生接口的连接池(`SessionPool`),使用该接口时,你只需要指定连接池的大小,就可以在使用时从池中获取连接。
+如果超过60s都没得到一个连接的话,那么会打印一条警告日志,但是程序仍将继续等待。
+
+当一个连接被用完后,他会自动返回池中等待下次被使用;
+当一个连接损坏后,他会从池中被删除,并重建一个连接重新执行用户的操作。
+
+对于查询操作:
+
+1. 使用SessionPool进行查询时,得到的结果集是`SessionDataSet`的封装类`SessionDataSetWrapper`;
+2. 若对于一个查询的结果集,用户并没有遍历完且不再想继续遍历时,需要手动调用释放连接的操作`closeResultSet`;
+3. 若对一个查询的结果集遍历时出现异常,也需要手动调用释放连接的操作`closeResultSet`.
+4. 可以调用 `SessionDataSetWrapper` 的 `getColumnNames()` 方法得到结果集列名 
+
+使用示例可以参见 ```session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java```
\ No newline at end of file
diff --git a/docs/Documentation/UserGuide/4-Client/2-Programming - Native API.md b/docs/Documentation/UserGuide/4-Client/2-Programming - Native API.md
index 336f711..ac72ebe 100644
--- a/docs/Documentation/UserGuide/4-Client/2-Programming - Native API.md	
+++ b/docs/Documentation/UserGuide/4-Client/2-Programming - Native API.md	
@@ -124,4 +124,27 @@ Here we show the commonly used interfaces and their parameters in the Native API
 
 To get more information of the following interfaces, please view session/src/main/java/org/apache/iotdb/session/Session.java
 
-The sample code of using these interfaces is in example/session/src/main/java/org/apache/iotdb/SessionExample.java,which provides an example of how to open an IoTDB session, execute a batch insertion.
\ No newline at end of file
+The sample code of using these interfaces is in example/session/src/main/java/org/apache/iotdb/SessionExample.java,which provides an example of how to open an IoTDB session, execute a batch insertion.
+
+
+# Session Pool for Native API
+
+We provided a connection pool (`SessionPool) for Native API.
+Using the interface, you need to define the pool size.
+
+If you can not get a session connection in 60 secondes, there is a warning log but the program will hang.
+
+If a session has finished an operation, it will be put back to the pool automatically.
+If a session connection is broken, the session will be removed automatically and the pool will try 
+to create a new session and redo the operation.
+
+For query operations:
+
+1. When using SessionPool to query data, the result set is `SessionDataSetWrapper`;
+2. Given a `SessionDataSetWrapper`, if you have not scanned all the data in it and stop to use it,
+you have to call `SessionPool.closeResultSet(wrapper)` manually;
+3. When you call `hasNext()` and `next()` of a `SessionDataSetWrapper` and there is an exception, then
+you have to call `SessionPool.closeResultSet(wrapper)` manually;
+4. You can call `getColumnNames()` of `SessionDataSetWrapper` to get the column names of query result;
+
+Examples: ```session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java```
\ No newline at end of file
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 280e2b6..5304cde 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -241,6 +241,7 @@ public class SessionExample {
   private static void query() throws IoTDBConnectionException, StatementExecutionException {
     SessionDataSet dataSet;
     dataSet = session.executeQueryStatement("select * from root.sg1.d1");
+    System.out.println(dataSet.getColumnNames());
     dataSet.setBatchSize(1024); // default is 512
     while (dataSet.hasNext()){
       System.out.println(dataSet.next());
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index d3df4a0..f0a1add 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -222,6 +222,15 @@ public class EnvironmentUtils {
     }
   }
 
+  public static void reactiveDaemon() {
+    if (daemon == null) {
+      daemon = new IoTDB();
+      daemon.active();
+    } else {
+      activeDaemon();
+    }
+  }
+
   private static void createAllDir() {
     // create sequential files
     for (String path : directoryManager.getAllSequenceFileFolders()) {
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
index cb5540f..763922d 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
@@ -49,6 +49,7 @@ public class SessionDataSet {
   private long sessionId;
   private TSIService.Iface client;
   private int batchSize = 1024;
+  private List<String> columnNameList;
   private List<String> columnTypeDeduplicatedList;
   // duplicated column index -> origin index
   Map<Integer, Integer> duplicateLocation;
@@ -71,6 +72,7 @@ public class SessionDataSet {
     this.sql = sql;
     this.queryId = queryId;
     this.client = client;
+    this.columnNameList = columnNameList;
     currentBitmap = new byte[columnNameList.size()];
     columnSize = columnNameList.size();
 
@@ -101,6 +103,10 @@ public class SessionDataSet {
     this.batchSize = batchSize;
   }
 
+  public List<String> getColumnNames() {
+    return columnNameList;
+  }
+
   public boolean hasNext() throws IoTDBConnectionException, StatementExecutionException {
     if (hasCachedRecord) {
       return true;
@@ -129,6 +135,8 @@ public class SessionDataSet {
     return true;
   }
 
+
+
   private void constructOneRow() {
     List<Field> outFields = new ArrayList<>();
     int loc = 0;
@@ -221,7 +229,7 @@ public class SessionDataSet {
       RpcUtils.verifySuccess(closeResp);
     } catch (TException e) {
       throw new IoTDBConnectionException(
-          "Error occurs when connecting to server for close operation, because: " + e);
+          "Error occurs when connecting to server for close operation, because: " + e, e);
     }
   }
 }
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java
new file mode 100644
index 0000000..c1cc7e2
--- /dev/null
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.pool;
+
+import java.util.List;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+public class SessionDataSetWrapper {
+
+  SessionDataSet sessionDataSet;
+  Session session;
+  SessionPool pool;
+
+  public SessionDataSetWrapper(SessionDataSet sessionDataSet,
+      Session session, SessionPool pool) {
+    this.sessionDataSet = sessionDataSet;
+    this.session = session;
+    this.pool = pool;
+  }
+
+  protected Session getSession() {
+    return session;
+  }
+
+  public int getBatchSize() {
+    return sessionDataSet.getBatchSize();
+  }
+
+  public void setBatchSize(int batchSize) {
+    sessionDataSet.setBatchSize(batchSize);
+  }
+
+  /**
+   * If there is an Exception, and you do not want to use the resultset anymore,
+   * you have to release the resultset manually by calling closeResultSet
+   * @return
+   * @throws IoTDBConnectionException
+   * @throws StatementExecutionException
+   */
+  public boolean hasNext() throws IoTDBConnectionException, StatementExecutionException {
+      boolean next = sessionDataSet.hasNext();
+      if (!next) {
+        pool.closeResultSet(this);
+      }
+      return next;
+  }
+  /**
+   * If there is an Exception, and you do not want to use the resultset anymore,
+   * you have to release the resultset manually by calling closeResultSet
+   * @return
+   * @throws IoTDBConnectionException
+   * @throws StatementExecutionException
+   */
+  public RowRecord next() throws IoTDBConnectionException, StatementExecutionException {
+    return sessionDataSet.next();
+  }
+
+  public List<String> getColumnNames() {
+    return sessionDataSet.getColumnNames();
+  }
+}
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
new file mode 100644
index 0000000..1f7b579
--- /dev/null
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -0,0 +1,669 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.pool;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.iotdb.rpc.BatchExecutionException;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.session.Config;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.record.RowBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SessionPool is a wrapper of a Session Set.
+ * Using SessionPool, the user do not need to consider how to reuse a session connection.
+ * Even if the session is disconnected, the session pool can recognize it and remove the broken
+ * session connection and create a new one.
+ *
+ * If there is no available connections and the pool reaches its max size, the all methods will hang
+ * until there is a available connection.
+ *
+ * If a user has waited for a session for more than 60 seconds, a warn log will be printed.
+ *
+ * The only thing you have to remember is that:
+ *
+ * For a query, if you have get all data, i.e., SessionDataSetWrapper.hasNext() == false, it is ok.
+ * Otherwise, i.e., you want to stop the query before you get all data (SessionDataSetWrapper.hasNext() == true),
+ * then you have to call closeResultSet(SessionDataSetWrapper wrapper) manually.
+ * Otherwise the connection is occupied by the query.
+ *
+ * Another case that you have to manually call closeResultSet() is that when there is exception
+ * when you call SessionDataSetWrapper.hasNext() or next()
+ *
+ */
+public class SessionPool {
+
+  private static final Logger logger = LoggerFactory.getLogger(SessionPool.class);
+  private ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
+  //for session whose resultSet is not released.
+  private ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
+
+  private int size = 0;
+  private int maxSize = 0;
+  private String ip;
+  private int port;
+  private String user;
+  private String password;
+
+  private int fetchSize;
+
+  private long timeout; //ms
+  private static int RETRY = 3;
+
+  public SessionPool(String ip, int port, String user, String password, int maxSize) {
+    this(ip, port, user, password, maxSize, Config.DEFAULT_FETCH_SIZE, 60_000);
+  }
+
+  public SessionPool(String ip, int port, String user, String password, int maxSize, int fetchSize,
+      long timeout) {
+    this.maxSize = maxSize;
+    this.ip = ip;
+    this.port = port;
+    this.user = user;
+    this.password = password;
+    this.fetchSize = fetchSize;
+    this.timeout = timeout;
+  }
+
+  //if this method throws an exception, either the server is broken, or the ip/port/user/password is incorrect.
+  //TODO: we can add a mechanism that if the user waits too long time, throw exception.
+  private Session getSession() throws IoTDBConnectionException {
+    Session session = queue.poll();
+    if (session != null) {
+      return session;
+    } else {
+      synchronized (this) {
+        long start = System.currentTimeMillis();
+        while (session == null) {
+          if (size < maxSize) {
+            //we can create more session
+            size++;
+            //but we do it after skip synchronized block because connection a session is time consuming.
+            break;
+          } else {
+            //we have to wait for someone returns a session.
+            try {
+              this.wait(1000);
+              if (System.currentTimeMillis() - start > 60_000) {
+                logger.warn(
+                    "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
+                    (System.currentTimeMillis() - start) / 1000, ip, port, user, password);
+                if (System.currentTimeMillis() - start > timeout) {
+                  throw new IoTDBConnectionException(
+                      String.format("timeout to get a connection from %s:%s", ip, port));
+                }
+              }
+            } catch (InterruptedException e) {
+              logger.error("the SessionPool is damaged", e);
+              Thread.currentThread().interrupt();
+            }
+            session = queue.poll();
+          }
+        }
+        if (session != null) {
+          return session;
+        }
+      }
+      if (logger.isDebugEnabled()) {
+        logger.error("Create a new Session {}, {}, {}, {}", ip, port, user, password);
+      }
+      session = new Session(ip, port, user, password, fetchSize);
+      session.open();
+      return session;
+    }
+  }
+
+  public int currentAvailableSize() {
+    return queue.size();
+  }
+
+  public int currentOccupiedSize() {
+    return occupied.size();
+  }
+
+  private void putBack(Session session) {
+    queue.push(session);
+    synchronized (this) {
+      this.notifyAll();
+    }
+  }
+
+  private void occupy(Session session) {
+    occupied.put(session, session);
+  }
+
+  /**
+   * close all connections in the pool
+   */
+  public synchronized void close() {
+    for (Session session : queue) {
+      try {
+        session.close();
+      } catch (IoTDBConnectionException e) {
+        //do nothing
+      }
+    }
+    for (Session session : occupied.keySet()) {
+      try {
+        session.close();
+      } catch (IoTDBConnectionException e) {
+        //do nothing
+      }
+    }
+    queue.clear();
+    occupied.clear();
+  }
+
+  public void closeResultSet(SessionDataSetWrapper wrapper) throws StatementExecutionException {
+    boolean putback = true;
+    try {
+      wrapper.sessionDataSet.closeOperationHandle();
+    } catch (IoTDBConnectionException e) {
+      removeSession();
+      putback = false;
+    } finally {
+      Session session = occupied.remove(wrapper.session);
+      if (putback && session != null) {
+        putBack(wrapper.session);
+      }
+    }
+  }
+
+  private synchronized void removeSession() {
+    if (logger.isDebugEnabled()) {
+      logger.error("Remove a broken Session {}, {}, {}, {}", ip, port, user, password);
+    }
+    size--;
+  }
+
+  private void closeSession(Session session) {
+    if (session != null) {
+      try {
+        session.close();
+      } catch (Exception e2) {
+        //do nothing. We just want to guarantee the session is closed.
+      }
+    }
+  }
+
+  /**
+   * use batch interface to insert sorted data times in row batch must be sorted before!
+   *
+   * @param rowBatch data batch
+   */
+  public void insertSortedBatch(RowBatch rowBatch)
+      throws IoTDBConnectionException, BatchExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.insertSortedBatch(rowBatch);
+        putBack(session);
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        closeSession(session);
+        removeSession();
+      } catch (BatchExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    throw new IoTDBConnectionException(
+        String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+  }
+
+
+  /**
+   * use batch interface to insert data
+   *
+   * @param rowBatch data batch
+   */
+  public void insertBatch(RowBatch rowBatch)
+      throws IoTDBConnectionException, BatchExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.insertBatch(rowBatch);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        closeSession(session);
+        removeSession();
+      } catch (BatchExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    throw new IoTDBConnectionException(
+        String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+  }
+
+  /**
+   * Insert data in batch format, which can reduce the overhead of network. This method is just like
+   * jdbc batch insert, we pack some insert request in batch and send them to server If you want
+   * improve your performance, please see insertBatch method
+   *
+   * @see Session#insertBatch(RowBatch)
+   */
+  public void insertInBatch(List<String> deviceIds, List<Long> times,
+      List<List<String>> measurementsList, List<List<String>> valuesList)
+      throws IoTDBConnectionException, BatchExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.insertInBatch(deviceIds, times, measurementsList, valuesList);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        closeSession(session);
+        removeSession();
+      } catch (BatchExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    throw new IoTDBConnectionException(
+        String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+  }
+
+  /**
+   * insert data in one row, if you want improve your performance, please use insertInBatch method
+   * or insertBatch method
+   *
+   * @see Session#insertInBatch(List, List, List, List)
+   * @see Session#insertBatch(RowBatch)
+   */
+  public TSStatus insert(String deviceId, long time, List<String> measurements, List<String> values)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        TSStatus resp = session.insert(deviceId, time, measurements, values);
+        putBack(session);
+        return resp;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        closeSession(session);
+        removeSession();
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    throw new IoTDBConnectionException(
+        String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public void testInsertBatch(RowBatch rowBatch)
+      throws IoTDBConnectionException, BatchExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.testInsertBatch(rowBatch);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        closeSession(session);
+        removeSession();
+      } catch (BatchExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    throw new IoTDBConnectionException(
+        String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public void testInsertInBatch(List<String> deviceIds, List<Long> times,
+      List<List<String>> measurementsList, List<List<String>> valuesList)
+      throws IoTDBConnectionException, BatchExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session
+            .testInsertInBatch(deviceIds, times, measurementsList, valuesList);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        closeSession(session);
+        removeSession();
+      } catch (BatchExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    throw new IoTDBConnectionException(
+        String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public void testInsert(String deviceId, long time, List<String> measurements,
+      List<String> values) throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.testInsert(deviceId, time, measurements, values);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        closeSession(session);
+        removeSession();
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    throw new IoTDBConnectionException(
+        String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+  }
+
+  /**
+   * delete a timeseries, including data and schema
+   *
+   * @param path timeseries to delete, should be a whole path
+   */
+  public void deleteTimeseries(String path)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.deleteTimeseries(path);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        closeSession(session);
+        removeSession();
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    throw new IoTDBConnectionException(
+        String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+  }
+
+  /**
+   * delete a timeseries, including data and schema
+   *
+   * @param paths timeseries to delete, should be a whole path
+   */
+  public void deleteTimeseries(List<String> paths)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.deleteTimeseries(paths);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        closeSession(session);
+        removeSession();
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    throw new IoTDBConnectionException(
+        String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+  }
+
+  /**
+   * delete data <= time in one timeseries
+   *
+   * @param path data in which time series to delete
+   * @param time data with time stamp less than or equal to time will be deleted
+   */
+  public void deleteData(String path, long time)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.deleteData(path, time);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        closeSession(session);
+        removeSession();
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    throw new IoTDBConnectionException(
+        String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+  }
+
+  /**
+   * delete data <= time in multiple timeseries
+   *
+   * @param paths data in which time series to delete
+   * @param time data with time stamp less than or equal to time will be deleted
+   */
+  public void deleteData(List<String> paths, long time)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.deleteData(paths, time);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        closeSession(session);
+        removeSession();
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    throw new IoTDBConnectionException(
+        String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+  }
+
+  public void setStorageGroup(String storageGroupId)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.setStorageGroup(storageGroupId);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        closeSession(session);
+        removeSession();
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    throw new IoTDBConnectionException(
+        String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+  }
+
+  public void deleteStorageGroup(String storageGroup)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.deleteStorageGroup(storageGroup);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        closeSession(session);
+        removeSession();
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    throw new IoTDBConnectionException(
+        String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+  }
+
+  public void deleteStorageGroups(List<String> storageGroup)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.deleteStorageGroups(storageGroup);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        closeSession(session);
+        removeSession();
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    throw new IoTDBConnectionException(
+        String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+  }
+
+  public void createTimeseries(String path, TSDataType dataType, TSEncoding encoding,
+      CompressionType compressor) throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.createTimeseries(path, dataType, encoding, compressor);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        closeSession(session);
+        removeSession();
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    throw new IoTDBConnectionException(
+        String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+  }
+
+  public boolean checkTimeseriesExists(String path)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        boolean resp = session.checkTimeseriesExists(path);
+        putBack(session);
+        return resp;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        closeSession(session);
+        removeSession();
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    throw new IoTDBConnectionException(
+        String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+  }
+
+  /**
+   * execure query sql users must call closeResultSet(SessionDataSetWrapper) if they do not use the
+   * SessionDataSet any more. users do not need to call sessionDataSet.closeOpeationHandler() any
+   * more.
+   *
+   * @param sql query statement
+   * @return result set Notice that you must get the result instance. Otherwise a data leakage will
+   * happen
+   */
+  public SessionDataSetWrapper executeQueryStatement(String sql)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        SessionDataSet resp = session.executeQueryStatement(sql);
+        SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
+        occupy(session);
+        return wrapper;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        closeSession(session);
+        removeSession();
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    throw new IoTDBConnectionException(
+        String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+  }
+
+  /**
+   * execute non query statement
+   *
+   * @param sql non query statement
+   */
+  public void executeNonQueryStatement(String sql)
+      throws StatementExecutionException, IoTDBConnectionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.executeNonQueryStatement(sql);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        closeSession(session);
+        removeSession();
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    throw new IoTDBConnectionException(
+        String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+  }
+}
diff --git a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
new file mode 100644
index 0000000..59dda6b
--- /dev/null
+++ b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.pool;
+
+import static org.junit.Assert.*;
+
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+//this test is not for testing the correctness of Session API. So we just implement one of the API.
+public class SessionPoolTest {
+
+  @Before
+  public void setUp() throws Exception {
+    System.setProperty(IoTDBConstant.IOTDB_CONF, "src/test/resources/");
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.envSetUp();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+  }
+
+
+  @Test
+  public void insert() {
+    SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+    ExecutorService service = Executors.newFixedThreadPool(10);
+    for (int i = 0; i < 10; i++) {
+      final int no = i;
+      service.submit(() -> {
+        try {
+          pool.insert("root.sg1.d1", 1, Collections.singletonList("s" + no), Collections.singletonList("3"));
+        } catch (IoTDBConnectionException | StatementExecutionException e) {
+          fail();
+        }
+      });
+    }
+    service.shutdown();
+    try {
+      assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }
+    assertTrue(pool.currentAvailableSize() <= 3);
+    assertEquals(0, pool.currentOccupiedSize());
+    pool.close();
+  }
+
+  @Test
+  public void incorrectSQL() {
+    SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+    assertEquals(0, pool.currentAvailableSize());
+    try {
+      pool.insert(".root.sg1.d1", 1, Collections.singletonList("s" ), Collections.singletonList("3"));
+    } catch (IoTDBConnectionException | StatementExecutionException e) {
+      //do nothing
+    }
+    assertEquals(1, pool.currentAvailableSize());
+    pool.close();
+  }
+
+
+  @Test
+  public void incorrectExecuteQueryStatement() {
+    SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+    ExecutorService service = Executors.newFixedThreadPool(10);
+    for (int i = 0; i < 10; i++) {
+      try {
+        pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i), Collections.singletonList("" + i));
+      } catch (IoTDBConnectionException | StatementExecutionException e) {
+        fail();
+      }
+    }
+    //now let's query
+    for (int i = 0; i < 10; i++) {
+      final int no = i;
+      service.submit(() -> {
+        try {
+          SessionDataSetWrapper wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time = " + no);
+          //this is incorrect becasue wrapper is not closed.
+          //so all other 7 queries will be blocked
+        } catch (IoTDBConnectionException | StatementExecutionException e) {
+          fail();
+        }
+      });
+    }
+    service.shutdown();
+    try {
+      assertFalse(service.awaitTermination(3, TimeUnit.SECONDS));
+      assertEquals(0, pool.currentAvailableSize());
+      assertTrue(pool.currentOccupiedSize() <= 3);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }
+    pool.close();
+  }
+
+  @Test
+  public void executeQueryStatement() {
+    SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+    correctQuery(pool);
+    pool.close();
+  }
+
+  private void correctQuery(SessionPool pool) {
+    ExecutorService service = Executors.newFixedThreadPool(10);
+    for (int i = 0; i < 10; i++) {
+      try {
+        pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i), Collections.singletonList("" + i));
+      } catch (IoTDBConnectionException | StatementExecutionException e) {
+        fail();
+      }
+    }
+    //now let's query
+    for (int i = 0; i < 10; i++) {
+      final int no = i;
+      service.submit(() -> {
+        try {
+          SessionDataSetWrapper wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time = " + no);
+          pool.closeResultSet(wrapper);
+          pool.closeResultSet(wrapper);
+        } catch (Exception e) {
+          e.printStackTrace();
+          fail();
+        }
+      });
+    }
+    service.shutdown();
+    try {
+      assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
+      assertTrue(pool.currentAvailableSize() <= 3);
+      assertEquals(0, pool.currentOccupiedSize());
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  @Test
+  public void tryIfTheServerIsRestart() {
+    SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1,6000);
+    for (int i = 0; i < 10; i++) {
+      try {
+        pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i), Collections.singletonList("" + i));
+      } catch (IoTDBConnectionException | StatementExecutionException e) {
+        fail();
+      }
+    }
+    SessionDataSetWrapper wrapper = null;
+    try {
+      wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time > 1");
+      EnvironmentUtils.stopDaemon();
+      //user does not know what happens.
+      while(wrapper.hasNext()) {
+        wrapper.next();
+      }
+    } catch (IoTDBConnectionException  e) {
+      try {
+        pool.closeResultSet(wrapper);
+      } catch (StatementExecutionException ex) {
+        ex.printStackTrace();
+        fail();
+      }
+      EnvironmentUtils.reactiveDaemon();
+      correctQuery(pool);
+      pool.close();
+      return;
+    } catch (StatementExecutionException e) {
+      fail("should be TTransportException but get an exception: " + e.getMessage());
+    }
+    fail("should throw exception but not");
+  }
+
+  @Test
+  public void tryIfTheServerIsRestartButDataIsGotten() {
+    SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1,60000);
+    for (int i = 0; i < 10; i++) {
+      try {
+        pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i), Collections.singletonList("" + i));
+      } catch (IoTDBConnectionException | StatementExecutionException e) {
+        fail();
+      }
+    }
+    assertEquals(1, pool.currentAvailableSize());
+    SessionDataSetWrapper wrapper = null;
+    try {
+      wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time > 1");
+      //user does not know what happens.
+      assertEquals(0, pool.currentAvailableSize());
+      assertEquals(1, pool.currentOccupiedSize());
+      while(wrapper.hasNext()) {
+        wrapper.next();
+      }
+      assertEquals(1, pool.currentAvailableSize());
+      assertEquals(0, pool.currentOccupiedSize());
+    } catch (IoTDBConnectionException | StatementExecutionException e) {
+      e.printStackTrace();
+      fail();
+    }
+    pool.close();
+  }
+
+}
\ No newline at end of file