You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/03/03 17:54:40 UTC

[incubator-iotdb] branch simple_pool created (now 779dd27)

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a change to branch simple_pool
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 779dd27  add a simple connection pool for session api

This branch includes the following new commits:

     new 75647e0  add a simple connection pool for session api
     new 779dd27  add a simple connection pool for session api

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 02/02: add a simple connection pool for session api

Posted by hx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch simple_pool
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 779dd275acd330bcc9cbf4393109cb77b5ee23a0
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Wed Mar 4 01:54:09 2020 +0800

    add a simple connection pool for session api
---
 session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java | 1 +
 1 file changed, 1 insertion(+)

diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 5956dfe..c2218fc 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -93,6 +93,7 @@ public class SessionPool {
   private int fetchSize;
 
   //if this method throws an exception, either the server is broken, or the ip/port/user/password is incorrect.
+  //TODO: we can add a mechanism that if the user waits too long time, throw exception.
   private Session getSession() throws IoTDBSessionException {
     Session session = queue.poll();
     if (session != null) {


[incubator-iotdb] 01/02: add a simple connection pool for session api

Posted by hx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch simple_pool
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 75647e03b45973a0ce8593dca7304ab9f96364f4
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Wed Mar 4 01:52:56 2020 +0800

    add a simple connection pool for session api
---
 .../4-Client/2-Programming - Native API.md         |  18 +-
 .../4-Client/2-Programming - Native API.md         |  24 +-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   9 +
 .../java/org/apache/iotdb/session/Session.java     |   1 +
 .../org/apache/iotdb/session/SessionDataSet.java   |   4 +-
 .../iotdb/session/pool/SessionDataSetWrapper.java  |  62 ++
 .../org/apache/iotdb/session/pool/SessionPool.java | 647 +++++++++++++++++++++
 .../apache/iotdb/session/pool/SessionPoolTest.java | 231 ++++++++
 8 files changed, 992 insertions(+), 4 deletions(-)

diff --git a/docs/Documentation-CHN/UserGuide/4-Client/2-Programming - Native API.md b/docs/Documentation-CHN/UserGuide/4-Client/2-Programming - Native API.md
index 86d6a57..34fed0e 100644
--- a/docs/Documentation-CHN/UserGuide/4-Client/2-Programming - Native API.md	
+++ b/docs/Documentation-CHN/UserGuide/4-Client/2-Programming - Native API.md	
@@ -98,4 +98,20 @@
 
 浏览上述接口的详细信息,请参阅代码 ```session/src/main/java/org/apache/iotdb/session/Session.java```
 
-使用上述接口的示例代码在 ```example/session/src/main/java/org/apache/iotdb/SessionExample.java```
\ No newline at end of file
+使用上述接口的示例代码在 ```example/session/src/main/java/org/apache/iotdb/SessionExample.java```
+
+# 针对原生接口的连接池
+
+我们提供了一个针对原生接口的连接池(`SessionPool`),使用该接口时,你只需要指定连接池的大小,就可以在使用时从池中获取连接。
+如果超过60s都没得到一个连接的话,那么会打印一条警告日志,但是程序仍将继续等待。
+
+当一个连接被用完后,他会自动返回池中等待下次被使用;
+当一个连接损坏后,他会从池中被删除,并重建一个连接重新执行用户的操作。
+
+对于查询操作:
+
+1. 使用SessionPool进行查询时,得到的结果集是`SessionDataSet`的封装类`SessionDataSetWrapper`;
+2. 若对于一个查询的结果集,用户并没有遍历完且不再想继续遍历时,需要手动调用释放连接的操作`closeResultSet`;
+3. 若对一个查询的结果集遍历时出现异常,也需要手动调用释放连接的操作`closeResultSet`.
+
+使用示例可以参见 ```session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java```
\ No newline at end of file
diff --git a/docs/Documentation/UserGuide/4-Client/2-Programming - Native API.md b/docs/Documentation/UserGuide/4-Client/2-Programming - Native API.md
index 336f711..c88c45a 100644
--- a/docs/Documentation/UserGuide/4-Client/2-Programming - Native API.md	
+++ b/docs/Documentation/UserGuide/4-Client/2-Programming - Native API.md	
@@ -124,4 +124,26 @@ Here we show the commonly used interfaces and their parameters in the Native API
 
 To get more information of the following interfaces, please view session/src/main/java/org/apache/iotdb/session/Session.java
 
-The sample code of using these interfaces is in example/session/src/main/java/org/apache/iotdb/SessionExample.java,which provides an example of how to open an IoTDB session, execute a batch insertion.
\ No newline at end of file
+The sample code of using these interfaces is in example/session/src/main/java/org/apache/iotdb/SessionExample.java,which provides an example of how to open an IoTDB session, execute a batch insertion.
+
+
+# Session Pool for Native API
+
+We provided a connection pool (`SessionPool) for Native API.
+Using the interface, you need to define the pool size.
+
+If you can not get a session connection in 60 secondes, there is a warning log but the program will hang.
+
+If a session has finished an operation, it will be put back to the pool automatically.
+If a session connection is broken, the session will be removed automatically and the pool will try 
+to create a new session and redo the operation.
+
+For query operations:
+
+1. When using SessionPool to query data, the result set is `SessionDataSetWrapper`;
+2. Given a `SessionDataSetWrapper`, if you have not scanned all the data in it and stop to use it,
+you have to call `SessionPool.closeResultSet(wrapper)` manually;
+3. When you call `hasNext()` and `next()` of a `SessionDataSetWrapper` and there is an exception, then
+you have to call `SessionPool.closeResultSet(wrapper)` manually;
+
+Examples: ```session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java```
\ No newline at end of file
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 5d7e025..8b7b884 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -224,6 +224,15 @@ public class EnvironmentUtils {
     }
   }
 
+  public static void reactiveDaemon() {
+    if (daemon == null) {
+      daemon = new IoTDB();
+      daemon.active();
+    } else {
+      activeDaemon();
+    }
+  }
+
   private static void createAllDir() {
     // create sequential files
     for (String path : directoryManager.getAllSequenceFileFolders()) {
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 2db5923..157d2ec 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -658,4 +658,5 @@ public class Session {
     }
   }
 
+
 }
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
index aeee91e..311833a 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
@@ -219,10 +219,10 @@ public class SessionDataSet {
       TSStatus closeResp = client.closeOperation(closeReq);
       RpcUtils.verifySuccess(closeResp);
     } catch (IoTDBRPCException e) {
-      throw new SQLException("Error occurs for close opeation in server side. The reason is " + e);
+      throw new SQLException("Error occurs for close opeation in server side. The reason is " + e, e);
     } catch (TException e) {
       throw new SQLException(
-          "Error occurs when connecting to server for close operation, because: " + e);
+          "Error occurs when connecting to server for close operation, because: " + e, e);
     }
   }
 }
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java
new file mode 100644
index 0000000..29fa5ec
--- /dev/null
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.pool;
+
+import java.sql.SQLException;
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+public class SessionDataSetWrapper {
+  SessionDataSet sessionDataSet;
+  Session session;
+  SessionPool pool;
+
+  public SessionDataSetWrapper(SessionDataSet sessionDataSet,
+      Session session, SessionPool pool) {
+    this.sessionDataSet = sessionDataSet;
+    this.session = session;
+    this.pool = pool;
+  }
+
+  protected Session getSession() {
+    return session;
+  }
+
+  public int getBatchSize() {
+    return sessionDataSet.getBatchSize();
+  }
+
+  public void setBatchSize(int batchSize) {
+    sessionDataSet.setBatchSize(batchSize);
+  }
+
+  public boolean hasNext() throws SQLException, IoTDBRPCException {
+    boolean next = sessionDataSet.hasNext();
+    if (!next) {
+      pool.closeResultSet(this);
+    }
+    return next;
+  }
+
+  public RowRecord next() throws SQLException, IoTDBRPCException {
+    return sessionDataSet.next();
+  }
+}
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
new file mode 100644
index 0000000..5956dfe
--- /dev/null
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.pool;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.session.Config;
+import org.apache.iotdb.session.IoTDBSessionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.record.RowBatch;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SessionPool is a wrapper of a Session Set.
+ * Using SessionPool, the user do not need to consider how to reuse a session connection.
+ * Even if the session is disconnected, the session pool can recognize it and remove the borken
+ * session connection and create a new one.
+ *
+ * If there is no available connections and the pool reaches its max size, the all methods will hang
+ * until there is a available connection.
+ *
+ * If a user has waited for a session for more than 60 seconds, a warn log will be printed.
+ *
+ *
+ * The only thing you have to remember is that:
+ *
+ * For a query, if you have get all data, i.e., SessionDataSetWrapper.hasNext() == false, it is ok.
+ * Otherwise, i.e., you want to stop the query before you get all data (SessionDataSetWrapper.hasNext() == true),
+ * then you have to call closeResultSet(SessionDataSetWrapper wrapper) manually.
+ * Otherwise the connection is occupied by the query.
+ *
+ * Another case that you have to manually call closeResultSet() is that when there is exception
+ * when you call SessionDataSetWrapper.hasNext() or next()
+ *
+ *
+ */
+public class SessionPool {
+
+  private static final Logger logger = LoggerFactory.getLogger(SessionPool.class);
+  private ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
+  //for session whose resultSet is not released.
+  private ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
+
+
+  public SessionPool(String ip, int port, String user, String password, int maxSize) {
+    this(ip, port, user, password, maxSize, Config.DEFAULT_FETCH_SIZE);
+  }
+
+  public SessionPool(String ip, int port, String user, String password, int maxSize, int fetchSize) {
+    this.maxSize = maxSize;
+    this.ip = ip;
+    this.port = port;
+    this.user = user;
+    this.password = password;
+    this.fetchSize = fetchSize;
+  }
+
+  private int size = 0;
+  private int maxSize = 0;
+  private String ip;
+  private int port;
+  private String user;
+  private String password;
+
+  private int fetchSize;
+
+  //if this method throws an exception, either the server is broken, or the ip/port/user/password is incorrect.
+  private Session getSession() throws IoTDBSessionException {
+    Session session = queue.poll();
+    if (session != null) {
+      return session;
+    } else {
+      synchronized (this) {
+        long start = System.currentTimeMillis();
+        while (session == null) {
+          if (size < maxSize) {
+            //we can create more session
+            size++;
+            //but we do it after skip synchronized block because connection a session is time consuming.
+            break;
+          } else {
+            //we have to wait for someone returns a session.
+            try {
+              this.wait(1000);
+              if (System.currentTimeMillis() - start > 60_000) {
+                logger.warn(
+                    "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
+                    (System.currentTimeMillis() - start) / 1000, ip, port, user, password);
+              }
+            } catch (InterruptedException e) {
+              logger.error("the SessionPool is damaged", e);
+              Thread.currentThread().interrupt();
+            }
+            session = queue.poll();
+          }
+        }
+        if (session != null) {
+          return session;
+        }
+      }
+      if (logger.isDebugEnabled()) {
+        logger.error("Create a new Session {}, {}, {}, {}", ip, port, user, password);
+      }
+      session = new Session(ip, port, user, password, fetchSize);
+      session.open();
+      return session;
+    }
+  }
+
+  public int currentAvailableSize() {
+    return queue.size();
+  }
+
+  public int currentOccupiedSize() {
+    return occupied.size();
+  }
+
+  private void putBack(Session session) {
+    queue.push(session);
+    synchronized (this) {
+      this.notifyAll();
+    }
+  }
+
+  private void occupy(Session session) {
+    occupied.put(session, session);
+  }
+
+  public void closeResultSet(SessionDataSetWrapper wrapper) throws SQLException {
+    boolean putback = true;
+    try {
+      wrapper.sessionDataSet.closeOperationHandle();
+    } catch (SQLException e) {
+      if (e.getCause() instanceof TException) {
+        // the connection is broken.
+        removeSession();
+        putback = false;
+      } else {
+        throw e;
+      }
+    } finally {
+      Session session = occupied.remove(wrapper.session);
+      if (putback && session != null) {
+        putBack(wrapper.session);
+      }
+    }
+  }
+
+  private synchronized void removeSession() {
+    if (logger.isDebugEnabled()) {
+      logger.error("Remove a broken Session {}, {}, {}, {}", ip, port, user, password);
+    }
+    size--;
+  }
+
+  private void closeSession(Session session) {
+    if (session != null) {
+      try {
+        session.close();
+      } catch (Exception e2) {
+        //do nothing. We just want to guarantee the session is closed.
+      }
+    }
+  }
+
+  /**
+   * use batch interface to insert sorted data times in row batch must be sorted before!
+   *
+   * @param rowBatch data batch
+   */
+  public TSExecuteBatchStatementResp insertSortedBatch(RowBatch rowBatch)
+      throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSExecuteBatchStatementResp resp = session.insertSortedBatch(rowBatch);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+
+
+  /**
+   * use batch interface to insert data
+   *
+   * @param rowBatch data batch
+   */
+  public TSExecuteBatchStatementResp insertBatch(RowBatch rowBatch) throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSExecuteBatchStatementResp resp = session.insertBatch(rowBatch);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  /**
+   * Insert data in batch format, which can reduce the overhead of network. This method is just like
+   * jdbc batch insert, we pack some insert request in batch and send them to server If you want
+   * improve your performance, please see insertBatch method
+   *
+   * @see Session#insertBatch(RowBatch)
+   */
+  public List<TSStatus> insertInBatch(List<String> deviceIds, List<Long> times,
+      List<List<String>> measurementsList, List<List<String>> valuesList)
+      throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        List<TSStatus> resp = session.insertInBatch(deviceIds, times, measurementsList, valuesList);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  /**
+   * insert data in one row, if you want improve your performance, please use insertInBatch method
+   * or insertBatch method
+   *
+   * @see Session#insertInBatch(List, List, List, List)
+   * @see Session#insertBatch(RowBatch)
+   */
+  public TSStatus insert(String deviceId, long time, List<String> measurements, List<String> values)
+      throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSStatus resp = session.insert(deviceId, time, measurements, values);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public TSExecuteBatchStatementResp testInsertBatch(RowBatch rowBatch)
+      throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSExecuteBatchStatementResp resp = session.testInsertBatch(rowBatch);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public List<TSStatus> testInsertInBatch(List<String> deviceIds, List<Long> times,
+      List<List<String>> measurementsList, List<List<String>> valuesList)
+      throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        List<TSStatus> resp = session
+            .testInsertInBatch(deviceIds, times, measurementsList, valuesList);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public TSStatus testInsert(String deviceId, long time, List<String> measurements,
+      List<String> values) throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSStatus resp = session.testInsert(deviceId, time, measurements, values);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  /**
+   * delete a timeseries, including data and schema
+   *
+   * @param path timeseries to delete, should be a whole path
+   */
+  public TSStatus deleteTimeseries(String path) throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSStatus resp = session.deleteTimeseries(path);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  /**
+   * delete a timeseries, including data and schema
+   *
+   * @param paths timeseries to delete, should be a whole path
+   */
+  public TSStatus deleteTimeseries(List<String> paths) throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSStatus resp = session.deleteTimeseries(paths);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  /**
+   * delete data <= time in one timeseries
+   *
+   * @param path data in which time series to delete
+   * @param time data with time stamp less than or equal to time will be deleted
+   */
+  public TSStatus deleteData(String path, long time) throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSStatus resp = session.deleteData(path, time);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  /**
+   * delete data <= time in multiple timeseries
+   *
+   * @param paths data in which time series to delete
+   * @param time data with time stamp less than or equal to time will be deleted
+   */
+  public TSStatus deleteData(List<String> paths, long time) throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSStatus resp = session.deleteData(paths, time);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  public TSStatus setStorageGroup(String storageGroupId) throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSStatus resp = session.setStorageGroup(storageGroupId);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  public TSStatus deleteStorageGroup(String storageGroup) throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSStatus resp = session.deleteStorageGroup(storageGroup);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  public TSStatus deleteStorageGroups(List<String> storageGroup) throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSStatus resp = session.deleteStorageGroups(storageGroup);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  public TSStatus createTimeseries(String path, TSDataType dataType, TSEncoding encoding,
+      CompressionType compressor) throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        TSStatus resp = session.createTimeseries(path, dataType, encoding, compressor);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  public boolean checkTimeseriesExists(String path) throws IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        boolean resp = session.checkTimeseriesExists(path);
+        putBack(session);
+        return resp;
+      } catch (IoTDBSessionException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  /**
+   * execure query sql users must call closeResultSet(SessionDataSetWrapper) if they do not use the
+   * SessionDataSet any more. users do not need to call sessionDataSet.closeOpeationHandler() any
+   * more.
+   *
+   * @param sql query statement
+   * @return result set Notice that you must get the result instance. Otherwise a data leakage will happen
+   */
+  public SessionDataSetWrapper executeQueryStatement(String sql)
+      throws IoTDBRPCException, IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        SessionDataSet resp = session.executeQueryStatement(sql);
+        SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
+        occupy(session);
+        return wrapper;
+      } catch (TException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        closeSession(session);
+        removeSession();
+      } catch (IoTDBRPCException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+
+  /**
+   * execute non query statement
+   *
+   * @param sql non query statement
+   */
+  public void executeNonQueryStatement(String sql)
+      throws IoTDBRPCException, IoTDBSessionException {
+    while (true) {
+      Session session = getSession();
+      try {
+        session.executeNonQueryStatement(sql);
+        putBack(session);
+        return;
+      } catch (TException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        closeSession(session);
+        removeSession();
+      } catch (IoTDBRPCException e) {
+        if (e.getCause() instanceof TException) {
+          // TException means the connection is broken, remove it and get a new one.
+          closeSession(session);
+          removeSession();
+        } else {
+          putBack(session);
+          throw e;
+        }
+      }
+    }
+  }
+}
diff --git a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
new file mode 100644
index 0000000..5114b28
--- /dev/null
+++ b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -0,0 +1,231 @@
+package org.apache.iotdb.session.pool;
+
+import static org.junit.Assert.*;
+
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.session.IoTDBSessionException;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+//this test is not for testing the correctness of Session API. So we just implement one of the API.
+public class SessionPoolTest {
+
+  @Before
+  public void setUp() throws Exception {
+    System.setProperty(IoTDBConstant.IOTDB_CONF, "src/test/resources/");
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.envSetUp();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+  }
+
+
+  @Test
+  public void insert() {
+    SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+    ExecutorService service = Executors.newFixedThreadPool(10);
+    for (int i = 0; i < 10; i++) {
+      final int no = i;
+      service.submit(() -> {
+        try {
+          pool.insert("root.sg1.d1", 1, Collections.singletonList("s" + no), Collections.singletonList("3"));
+        } catch (IoTDBSessionException e) {
+          fail();
+        }
+      });
+    }
+    service.shutdown();
+    try {
+      assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }
+    assertEquals(3, pool.currentAvailableSize());
+    assertEquals(0, pool.currentOccupiedSize());
+  }
+
+  @Test
+  public void incorrectSQL() {
+    SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+    assertEquals(0, pool.currentAvailableSize());
+    try {
+      pool.insert(".root.sg1.d1", 1, Collections.singletonList("s" ), Collections.singletonList("3"));
+    } catch (IoTDBSessionException e) {
+      //do nothing
+    }
+    assertEquals(1, pool.currentAvailableSize());
+  }
+
+
+  @Test
+  public void incorrectExecuteQueryStatement() {
+    SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+    ExecutorService service = Executors.newFixedThreadPool(10);
+    for (int i = 0; i < 10; i++) {
+      try {
+        pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i), Collections.singletonList("" + i));
+      } catch (IoTDBSessionException e) {
+        fail();
+      }
+    }
+    //now let's query
+    for (int i = 0; i < 10; i++) {
+      final int no = i;
+      service.submit(() -> {
+        try {
+          SessionDataSetWrapper wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time = " + no);
+          //this is incorrect becasue wrapper is not closed.
+          //so all other 7 queries will be blocked
+        } catch (IoTDBSessionException e) {
+          fail();
+        } catch (IoTDBRPCException e) {
+          e.printStackTrace();
+        }
+      });
+    }
+    service.shutdown();
+    try {
+      assertFalse(service.awaitTermination(3, TimeUnit.SECONDS));
+      assertEquals(0, pool.currentAvailableSize());
+      assertEquals(3, pool.currentOccupiedSize());
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  @Test
+  public void executeQueryStatement() {
+    SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+    correctQuery(pool);
+  }
+
+  private void correctQuery(SessionPool pool) {
+    ExecutorService service = Executors.newFixedThreadPool(10);
+    for (int i = 0; i < 10; i++) {
+      try {
+        pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i), Collections.singletonList("" + i));
+      } catch (IoTDBSessionException e) {
+        fail();
+      }
+    }
+    //now let's query
+    for (int i = 0; i < 10; i++) {
+      final int no = i;
+      service.submit(() -> {
+        try {
+          SessionDataSetWrapper wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time = " + no);
+          pool.closeResultSet(wrapper);
+          pool.closeResultSet(wrapper);
+        } catch (Exception e) {
+          e.printStackTrace();
+          fail();
+        }
+      });
+    }
+    service.shutdown();
+    try {
+      assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
+      assertEquals(3, pool.currentAvailableSize());
+      assertEquals(0, pool.currentOccupiedSize());
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  @Test
+  public void tryIfTheServerIsRestart() {
+    SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1);
+    ExecutorService service = Executors.newFixedThreadPool(10);
+    for (int i = 0; i < 10; i++) {
+      try {
+        pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i), Collections.singletonList("" + i));
+      } catch (IoTDBSessionException e) {
+        fail();
+      }
+    }
+    SessionDataSetWrapper wrapper = null;
+    try {
+      wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time > 1");
+      EnvironmentUtils.stopDaemon();
+      //user does not know what happens.
+      while(wrapper.hasNext()) {
+        wrapper.next();
+      }
+    } catch (IoTDBRPCException e) {
+      e.printStackTrace();
+      fail();
+    } catch (IoTDBSessionException e) {
+      e.printStackTrace();
+      fail();
+    } catch (SQLException e) {
+      if (e.getCause() instanceof TException) {
+        try {
+          pool.closeResultSet(wrapper);
+        } catch (SQLException ex) {
+          ex.printStackTrace();
+          fail();
+        }
+      } else {
+        fail("should be TTransportException but get an exception: " + e.getMessage());
+      }
+      EnvironmentUtils.reactiveDaemon();
+      correctQuery(pool);
+      return;
+    }
+    fail("should throw exception but not");
+  }
+
+  @Test
+  public void tryIfTheServerIsRestartButDataIsGotten() {
+    SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1);
+    ExecutorService service = Executors.newFixedThreadPool(10);
+    for (int i = 0; i < 10; i++) {
+      try {
+        pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i), Collections.singletonList("" + i));
+      } catch (IoTDBSessionException e) {
+        fail();
+      }
+    }
+    assertEquals(1, pool.currentAvailableSize());
+    SessionDataSetWrapper wrapper = null;
+    try {
+      wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time > 1");
+      //user does not know what happens.
+      assertEquals(0, pool.currentAvailableSize());
+      assertEquals(1, pool.currentOccupiedSize());
+      while(wrapper.hasNext()) {
+        wrapper.next();
+      }
+      assertEquals(1, pool.currentAvailableSize());
+      assertEquals(0, pool.currentOccupiedSize());
+    } catch (IoTDBRPCException e) {
+      e.printStackTrace();
+      fail();
+    } catch (IoTDBSessionException e) {
+      e.printStackTrace();
+      fail();
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+}
\ No newline at end of file