You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/03/03 17:54:41 UTC

[incubator-iotdb] 01/02: add a simple connection pool for session api

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch simple_pool
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 75647e03b45973a0ce8593dca7304ab9f96364f4
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Wed Mar 4 01:52:56 2020 +0800

    add a simple connection pool for session api
---
 .../4-Client/2-Programming - Native API.md         |  18 +-
 .../4-Client/2-Programming - Native API.md         |  24 +-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   9 +
 .../java/org/apache/iotdb/session/Session.java     |   1 +
 .../org/apache/iotdb/session/SessionDataSet.java   |   4 +-
 .../iotdb/session/pool/SessionDataSetWrapper.java  |  62 ++
 .../org/apache/iotdb/session/pool/SessionPool.java | 647 +++++++++++++++++++++
 .../apache/iotdb/session/pool/SessionPoolTest.java | 231 ++++++++
 8 files changed, 992 insertions(+), 4 deletions(-)

diff --git a/docs/Documentation-CHN/UserGuide/4-Client/2-Programming - Native API.md b/docs/Documentation-CHN/UserGuide/4-Client/2-Programming - Native API.md
index 86d6a57..34fed0e 100644
--- a/docs/Documentation-CHN/UserGuide/4-Client/2-Programming - Native API.md	
+++ b/docs/Documentation-CHN/UserGuide/4-Client/2-Programming - Native API.md	
@@ -98,4 +98,20 @@
 
 浏览上述接口的详细信息,请参阅代码 ```session/src/main/java/org/apache/iotdb/session/Session.java```
 
-使用上述接口的示例代码在 ```example/session/src/main/java/org/apache/iotdb/SessionExample.java```
\ No newline at end of file
+使用上述接口的示例代码在 ```example/session/src/main/java/org/apache/iotdb/SessionExample.java```
+
+# 针对原生接口的连接池
+
+我们提供了一个针对原生接口的连接池(`SessionPool`),使用该接口时,你只需要指定连接池的大小,就可以在使用时从池中获取连接。
+如果超过60s都没得到一个连接的话,那么会打印一条警告日志,但是程序仍将继续等待。
+
+当一个连接被用完后,他会自动返回池中等待下次被使用;
+当一个连接损坏后,他会从池中被删除,并重建一个连接重新执行用户的操作。
+
+对于查询操作:
+
+1. 使用SessionPool进行查询时,得到的结果集是`SessionDataSet`的封装类`SessionDataSetWrapper`;
+2. 若对于一个查询的结果集,用户并没有遍历完且不再想继续遍历时,需要手动调用释放连接的操作`closeResultSet`;
+3. 若对一个查询的结果集遍历时出现异常,也需要手动调用释放连接的操作`closeResultSet`.
+
+使用示例可以参见 ```session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java```
\ No newline at end of file
diff --git a/docs/Documentation/UserGuide/4-Client/2-Programming - Native API.md b/docs/Documentation/UserGuide/4-Client/2-Programming - Native API.md
index 336f711..c88c45a 100644
--- a/docs/Documentation/UserGuide/4-Client/2-Programming - Native API.md	
+++ b/docs/Documentation/UserGuide/4-Client/2-Programming - Native API.md	
@@ -124,4 +124,26 @@ Here we show the commonly used interfaces and their parameters in the Native API
 
 To get more information of the following interfaces, please view session/src/main/java/org/apache/iotdb/session/Session.java
 
-The sample code of using these interfaces is in example/session/src/main/java/org/apache/iotdb/SessionExample.java,which provides an example of how to open an IoTDB session, execute a batch insertion.
\ No newline at end of file
+The sample code of using these interfaces is in example/session/src/main/java/org/apache/iotdb/SessionExample.java,which provides an example of how to open an IoTDB session, execute a batch insertion.
+
+
+# Session Pool for Native API
+
+We provided a connection pool (`SessionPool) for Native API.
+Using the interface, you need to define the pool size.
+
+If you can not get a session connection in 60 secondes, there is a warning log but the program will hang.
+
+If a session has finished an operation, it will be put back to the pool automatically.
+If a session connection is broken, the session will be removed automatically and the pool will try 
+to create a new session and redo the operation.
+
+For query operations:
+
+1. When using SessionPool to query data, the result set is `SessionDataSetWrapper`;
+2. Given a `SessionDataSetWrapper`, if you have not scanned all the data in it and stop to use it,
+you have to call `SessionPool.closeResultSet(wrapper)` manually;
+3. When you call `hasNext()` and `next()` of a `SessionDataSetWrapper` and there is an exception, then
+you have to call `SessionPool.closeResultSet(wrapper)` manually;
+
+Examples: ```session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java```
\ No newline at end of file
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 5d7e025..8b7b884 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -224,6 +224,15 @@ public class EnvironmentUtils {
     }
   }
 
+  public static void reactiveDaemon() {
+    if (daemon == null) {
+      daemon = new IoTDB();
+      daemon.active();
+    } else {
+      activeDaemon();
+    }
+  }
+
   private static void createAllDir() {
     // create sequential files
     for (String path : directoryManager.getAllSequenceFileFolders()) {
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 2db5923..157d2ec 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -658,4 +658,5 @@ public class Session {
     }
   }
 
+
 }
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
index aeee91e..311833a 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
@@ -219,10 +219,10 @@ public class SessionDataSet {
       TSStatus closeResp = client.closeOperation(closeReq);
       RpcUtils.verifySuccess(closeResp);
     } catch (IoTDBRPCException e) {
-      throw new SQLException("Error occurs for close opeation in server side. The reason is " + e);
+      throw new SQLException("Error occurs for close opeation in server side. The reason is " + e, e);
     } catch (TException e) {
       throw new SQLException(
-          "Error occurs when connecting to server for close operation, because: " + e);
+          "Error occurs when connecting to server for close operation, because: " + e, e);
     }
   }
 }
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java
new file mode 100644
index 0000000..29fa5ec
--- /dev/null
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.pool;
+
+import java.sql.SQLException;
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+public class SessionDataSetWrapper {
+  SessionDataSet sessionDataSet;
+  Session session;
+  SessionPool pool;
+
+  public SessionDataSetWrapper(SessionDataSet sessionDataSet,
+      Session session, SessionPool pool) {
+    this.sessionDataSet = sessionDataSet;
+    this.session = session;
+    this.pool = pool;
+  }
+
+  protected Session getSession() {
+    return session;
+  }
+
+  public int getBatchSize() {
+    return sessionDataSet.getBatchSize();
+  }
+
+  public void setBatchSize(int batchSize) {
+    sessionDataSet.setBatchSize(batchSize);
+  }
+
+  public boolean hasNext() throws SQLException, IoTDBRPCException {
+    boolean next = sessionDataSet.hasNext();
+    if (!next) {
+      pool.closeResultSet(this);
+    }
+    return next;
+  }
+
+  public RowRecord next() throws SQLException, IoTDBRPCException {
+    return sessionDataSet.next();
+  }
+}
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
new file mode 100644
index 0000000..5956dfe
--- /dev/null
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.pool;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.session.Config;
+import org.apache.iotdb.session.IoTDBSessionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.record.RowBatch;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SessionPool is a wrapper of a Session Set.
+ * Using SessionPool, the user do not need to consider how to reuse a session connection.
+ * Even if the session is disconnected, the session pool can recognize it and remove the borken
+ * session connection and create a new one.
+ *
+ * If there is no available connections and the pool reaches its max size, the all methods will hang
+ * until there is a available connection.
+ *
+ * If a user has waited for a session for more than 60 seconds, a warn log will be printed.
+ *
+ *
+ * The only thing you have to remember is that:
+ *
+ * For a query, if you have get all data, i.e., SessionDataSetWrapper.hasNext() == false, it is ok.
+ * Otherwise, i.e., you want to stop the query before you get all data (SessionDataSetWrapper.hasNext() == true),
+ * then you have to call closeResultSet(SessionDataSetWrapper wrapper) manually.
+ * Otherwise the connection is occupied by the query.
+ *
+ * Another case that you have to manually call closeResultSet() is that when there is exception
+ * when you call SessionDataSetWrapper.hasNext() or next()
+ *
+ *
+ */
+public class SessionPool {
+
+  private static final Logger logger = LoggerFactory.getLogger(SessionPool.class);
+  private ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
+  //for session whose resultSet is not released.
+  private ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
+
+
+  public SessionPool(String ip, int port, String user, String password, int maxSize) {
+    this(ip, port, user, password, maxSize, Config.DEFAULT_FETCH_SIZE);
+  }
+
+  public SessionPool(String ip, int port, String user, String password, int maxSize, int fetchSize) {
+    this.maxSize = maxSize;
+    this.ip = ip;
+    this.port = port;
+    this.user = user;
+    this.password = password;
+    this.fetchSize = fetchSize;
+  }
+
+  private int size = 0;
+  private int maxSize = 0;
+  private String ip;
+  private int port;
+  private String user;
+  private String password;
+
+  private int fetchSize;
+
+  //if this method throws an exception, either the server is broken, or the ip/port/user/password is incorrect.
+  private Session getSession() throws IoTDBSessionException {
+    Session session = queue.poll();
+    if (session != null) {
+      return session;
+    } else {
+      synchronized (this) {
+        long start = System.currentTimeMillis();
+        while (session == null) {
+          if (size < maxSize) {
+            //we can create more session
+            size++;
+            //but we do it after skip synchronized block because connection a session is time consuming.
+            break;
+          } else {
+            //we have to wait for someone returns a session.
+            try {
+              this.wait(1000);
+              if (System.currentTimeMillis() - start > 60_000) {
+                logger.warn(
+                    "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
+                    (System.currentTimeMillis() - start) / 1000, ip, port, user, password);
+              }
+            } catch (InterruptedException e) {
+              logger.error("the SessionPool is damaged", e);
+              Thread.currentThread().interrupt();
+            }
+            session = queue.poll();
+          }
+        }
+        if (session != null) {
+          return session;
+        }
+      }
+      if (logger.isDebugEnabled()) {
+        logger.error("Create a new Session {}, {}, {}, {}", ip, port, user, password);
+      }
+      session = new Session(ip, port, user, password, fetchSize);
+      session.open();
+      return session;
+    }
+  }
+
+  public int currentAvailableSize() {
+    return queue.size();
+  }
+
+  public int currentOccupiedSize() {
+    return occupied.size();
+  }
+
+  private void putBack(Session session) {
+    queue.push(session);
+    synchronized (this) {
+      this.notifyAll();
+    }
+  }
+
+  private void occupy(Session session) {
+    occupied.put(session, session);
+  }
+
+  public void closeResultSet(SessionDataSetWrapper wrapper) throws SQLException {
+    boolean putback = true;
+    try {
+      wrapper.sessionDataSet.closeOperationHandle();
+    } catch (SQLException e) {
+      if (e.getCause() instanceof TException) {
+        // the connection is broken.
+        removeSession();
+        putback = false;
+      } else {
+        throw e;
+      }
+    } finally {
+      Session session = occupied.remove(wrapper.session);
+      if (putback && session != null) {
+        putBack(wrapper.session);
+      }
+    }
+  }
+
+  private synchronized void removeSession() {
+    if (logger.isDebugEnabled()) {
+      logger.error("Remove a broken Session {}, {}, {}, {}", ip, port, user, password);
+    }
+    size--;
+  }
+
+  private void closeSession(Session session) {
+    if (session != null) {
+      try {
+        session.close();
+      } catch (Exception e2) {
+        //do nothing. We just want to guarantee the session is closed.
+      }
+    }
+  }
+
+  /**
+   * use batch interface to insert sorted data times in row batch must be sorted before!
+   *
+   * @param rowBatch data batch
+   */
+  public TSExecuteBatchStatementResp insertSortedBatch(RowBatch rowBatch)
+      throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSExecuteBatchStatementResp resp = session.insertSortedBatch(rowBatch);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+
+
+  /**
+   * use batch interface to insert data
+   *
+   * @param rowBatch data batch
+   */
+  public TSExecuteBatchStatementResp insertBatch(RowBatch rowBatch) throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSExecuteBatchStatementResp resp = session.insertBatch(rowBatch);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  /**
+   * Insert data in batch format, which can reduce the overhead of network. This method is just like
+   * jdbc batch insert, we pack some insert request in batch and send them to server If you want
+   * improve your performance, please see insertBatch method
+   *
+   * @see Session#insertBatch(RowBatch)
+   */
+  public List<TSStatus> insertInBatch(List<String> deviceIds, List<Long> times,
+      List<List<String>> measurementsList, List<List<String>> valuesList)
+      throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        List<TSStatus> resp = session.insertInBatch(deviceIds, times, measurementsList, valuesList);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  /**
+   * insert data in one row, if you want improve your performance, please use insertInBatch method
+   * or insertBatch method
+   *
+   * @see Session#insertInBatch(List, List, List, List)
+   * @see Session#insertBatch(RowBatch)
+   */
+  public TSStatus insert(String deviceId, long time, List<String> measurements, List<String> values)
+      throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSStatus resp = session.insert(deviceId, time, measurements, values);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public TSExecuteBatchStatementResp testInsertBatch(RowBatch rowBatch)
+      throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSExecuteBatchStatementResp resp = session.testInsertBatch(rowBatch);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public List<TSStatus> testInsertInBatch(List<String> deviceIds, List<Long> times,
+      List<List<String>> measurementsList, List<List<String>> valuesList)
+      throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        List<TSStatus> resp = session
+            .testInsertInBatch(deviceIds, times, measurementsList, valuesList);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public TSStatus testInsert(String deviceId, long time, List<String> measurements,
+      List<String> values) throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSStatus resp = session.testInsert(deviceId, time, measurements, values);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  /**
+   * delete a timeseries, including data and schema
+   *
+   * @param path timeseries to delete, should be a whole path
+   */
+  public TSStatus deleteTimeseries(String path) throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSStatus resp = session.deleteTimeseries(path);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  /**
+   * delete a timeseries, including data and schema
+   *
+   * @param paths timeseries to delete, should be a whole path
+   */
+  public TSStatus deleteTimeseries(List<String> paths) throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSStatus resp = session.deleteTimeseries(paths);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  /**
+   * delete data <= time in one timeseries
+   *
+   * @param path data in which time series to delete
+   * @param time data with time stamp less than or equal to time will be deleted
+   */
+  public TSStatus deleteData(String path, long time) throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSStatus resp = session.deleteData(path, time);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  /**
+   * delete data <= time in multiple timeseries
+   *
+   * @param paths data in which time series to delete
+   * @param time data with time stamp less than or equal to time will be deleted
+   */
+  public TSStatus deleteData(List<String> paths, long time) throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSStatus resp = session.deleteData(paths, time);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  public TSStatus setStorageGroup(String storageGroupId) throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSStatus resp = session.setStorageGroup(storageGroupId);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  public TSStatus deleteStorageGroup(String storageGroup) throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSStatus resp = session.deleteStorageGroup(storageGroup);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  public TSStatus deleteStorageGroups(List<String> storageGroup) throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSStatus resp = session.deleteStorageGroups(storageGroup);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  public TSStatus createTimeseries(String path, TSDataType dataType, TSEncoding encoding,
+      CompressionType compressor) throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSStatus resp = session.createTimeseries(path, dataType, encoding, compressor);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  public boolean checkTimeseriesExists(String path) throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        boolean resp = session.checkTimeseriesExists(path);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  /**
+   * execure query sql users must call closeResultSet(SessionDataSetWrapper) if they do not use the
+   * SessionDataSet any more. users do not need to call sessionDataSet.closeOpeationHandler() any
+   * more.
+   *
+   * @param sql query statement
+   * @return result set Notice that you must get the result instance. Otherwise a data leakage will happen
+   */
+  public SessionDataSetWrapper executeQueryStatement(String sql)
+      throws IoTDBRPCException, IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        SessionDataSet resp = session.executeQueryStatement(sql);
+        SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
+        occupy(session);
+        return wrapper;
+      } catch (TException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        closeSession(session);
+        removeSession();
+      } catch (IoTDBRPCException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  /**
+   * execute non query statement
+   *
+   * @param sql non query statement
+   */
+  public void executeNonQueryStatement(String sql)
+      throws IoTDBRPCException, IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        session.executeNonQueryStatement(sql);
+        putBack(session);
+        return;
+      } catch (TException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        closeSession(session);
+        removeSession();
+      } catch (IoTDBRPCException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+}
diff --git a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
new file mode 100644
index 0000000..5114b28
--- /dev/null
+++ b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -0,0 +1,231 @@
+package org.apache.iotdb.session.pool;
+
+import static org.junit.Assert.*;
+
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.session.IoTDBSessionException;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+//this test is not for testing the correctness of Session API. So we just implement one of the API.
+public class SessionPoolTest {
+
+  @Before
+  public void setUp() throws Exception {
+    System.setProperty(IoTDBConstant.IOTDB_CONF, "src/test/resources/");
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.envSetUp();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+  }
+
+
+  @Test
+  public void insert() {
+    SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+    ExecutorService service = Executors.newFixedThreadPool(10);
+    for (int i = 0; i < 10; i++) {
+      final int no = i;
+      service.submit(() -> {
+        try {
+          pool.insert("root.sg1.d1", 1, Collections.singletonList("s" + no), Collections.singletonList("3"));
+        } catch (IoTDBSessionException e) {
+          fail();
+        }
+      });
+    }
+    service.shutdown();
+    try {
+      assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }
+    assertEquals(3, pool.currentAvailableSize());
+    assertEquals(0, pool.currentOccupiedSize());
+  }
+
+  @Test
+  public void incorrectSQL() {
+    SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+    assertEquals(0, pool.currentAvailableSize());
+    try {
+      pool.insert(".root.sg1.d1", 1, Collections.singletonList("s" ), Collections.singletonList("3"));
+    } catch (IoTDBSessionException e) {
+      //do nothing
+    }
+    assertEquals(1, pool.currentAvailableSize());
+  }
+
+
+  @Test
+  public void incorrectExecuteQueryStatement() {
+    SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+    ExecutorService service = Executors.newFixedThreadPool(10);
+    for (int i = 0; i < 10; i++) {
+      try {
+        pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i), Collections.singletonList("" + i));
+      } catch (IoTDBSessionException e) {
+        fail();
+      }
+    }
+    //now let's query
+    for (int i = 0; i < 10; i++) {
+      final int no = i;
+      service.submit(() -> {
+        try {
+          SessionDataSetWrapper wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time = " + no);
+          //this is incorrect becasue wrapper is not closed.
+          //so all other 7 queries will be blocked
+        } catch (IoTDBSessionException e) {
+          fail();
+        } catch (IoTDBRPCException e) {
+          e.printStackTrace();
+        }
+      });
+    }
+    service.shutdown();
+    try {
+      assertFalse(service.awaitTermination(3, TimeUnit.SECONDS));
+      assertEquals(0, pool.currentAvailableSize());
+      assertEquals(3, pool.currentOccupiedSize());
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  @Test
+  public void executeQueryStatement() {
+    SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+    correctQuery(pool);
+  }
+
+  private void correctQuery(SessionPool pool) {
+    ExecutorService service = Executors.newFixedThreadPool(10);
+    for (int i = 0; i < 10; i++) {
+      try {
+        pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i), Collections.singletonList("" + i));
+      } catch (IoTDBSessionException e) {
+        fail();
+      }
+    }
+    //now let's query
+    for (int i = 0; i < 10; i++) {
+      final int no = i;
+      service.submit(() -> {
+        try {
+          SessionDataSetWrapper wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time = " + no);
+          pool.closeResultSet(wrapper);
+          pool.closeResultSet(wrapper);
+        } catch (Exception e) {
+          e.printStackTrace();
+          fail();
+        }
+      });
+    }
+    service.shutdown();
+    try {
+      assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
+      assertEquals(3, pool.currentAvailableSize());
+      assertEquals(0, pool.currentOccupiedSize());
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  @Test
+  public void tryIfTheServerIsRestart() {
+    SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1);
+    ExecutorService service = Executors.newFixedThreadPool(10);
+    for (int i = 0; i < 10; i++) {
+      try {
+        pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i), Collections.singletonList("" + i));
+      } catch (IoTDBSessionException e) {
+        fail();
+      }
+    }
+    SessionDataSetWrapper wrapper = null;
+    try {
+      wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time > 1");
+      EnvironmentUtils.stopDaemon();
+      //user does not know what happens.
+      while(wrapper.hasNext()) {
+        wrapper.next();
+      }
+    } catch (IoTDBRPCException e) {
+      e.printStackTrace();
+      fail();
+    } catch (IoTDBSessionException e) {
+      e.printStackTrace();
+      fail();
+    } catch (SQLException e) {
+      if (e.getCause() instanceof TException) {
+        try {
+          pool.closeResultSet(wrapper);
+        } catch (SQLException ex) {
+          ex.printStackTrace();
+          fail();
+        }
+      } else {
+        fail("should be TTransportException but get an exception: " + e.getMessage());
+      }
+      EnvironmentUtils.reactiveDaemon();
+      correctQuery(pool);
+      return;
+    }
+    fail("should throw exception but not");
+  }
+
+  @Test
+  public void tryIfTheServerIsRestartButDataIsGotten() {
+    SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1);
+    ExecutorService service = Executors.newFixedThreadPool(10);
+    for (int i = 0; i < 10; i++) {
+      try {
+        pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i), Collections.singletonList("" + i));
+      } catch (IoTDBSessionException e) {
+        fail();
+      }
+    }
+    assertEquals(1, pool.currentAvailableSize());
+    SessionDataSetWrapper wrapper = null;
+    try {
+      wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time > 1");
+      //user does not know what happens.
+      assertEquals(0, pool.currentAvailableSize());
+      assertEquals(1, pool.currentOccupiedSize());
+      while(wrapper.hasNext()) {
+        wrapper.next();
+      }
+      assertEquals(1, pool.currentAvailableSize());
+      assertEquals(0, pool.currentOccupiedSize());
+    } catch (IoTDBRPCException e) {
+      e.printStackTrace();
+      fail();
+    } catch (IoTDBSessionException e) {
+      e.printStackTrace();
+      fail();
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+}
\ No newline at end of file