You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/03/03 17:54:41 UTC
[incubator-iotdb] 01/02: add a simple connection pool for session
api
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch simple_pool
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 75647e03b45973a0ce8593dca7304ab9f96364f4
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Wed Mar 4 01:52:56 2020 +0800
add a simple connection pool for session api
---
.../4-Client/2-Programming - Native API.md | 18 +-
.../4-Client/2-Programming - Native API.md | 24 +-
.../apache/iotdb/db/utils/EnvironmentUtils.java | 9 +
.../java/org/apache/iotdb/session/Session.java | 1 +
.../org/apache/iotdb/session/SessionDataSet.java | 4 +-
.../iotdb/session/pool/SessionDataSetWrapper.java | 62 ++
.../org/apache/iotdb/session/pool/SessionPool.java | 647 +++++++++++++++++++++
.../apache/iotdb/session/pool/SessionPoolTest.java | 231 ++++++++
8 files changed, 992 insertions(+), 4 deletions(-)
diff --git a/docs/Documentation-CHN/UserGuide/4-Client/2-Programming - Native API.md b/docs/Documentation-CHN/UserGuide/4-Client/2-Programming - Native API.md
index 86d6a57..34fed0e 100644
--- a/docs/Documentation-CHN/UserGuide/4-Client/2-Programming - Native API.md
+++ b/docs/Documentation-CHN/UserGuide/4-Client/2-Programming - Native API.md
@@ -98,4 +98,20 @@
浏览上述接口的详细信息,请参阅代码 ```session/src/main/java/org/apache/iotdb/session/Session.java```
-使用上述接口的示例代码在 ```example/session/src/main/java/org/apache/iotdb/SessionExample.java```
\ No newline at end of file
+使用上述接口的示例代码在 ```example/session/src/main/java/org/apache/iotdb/SessionExample.java```
+
+# 针对原生接口的连接池
+
+我们提供了一个针对原生接口的连接池(`SessionPool`),使用该接口时,你只需要指定连接池的大小,就可以在使用时从池中获取连接。
+如果超过60s都没得到一个连接的话,那么会打印一条警告日志,但是程序仍将继续等待。
+
+当一个连接被用完后,他会自动返回池中等待下次被使用;
+当一个连接损坏后,他会从池中被删除,并重建一个连接重新执行用户的操作。
+
+对于查询操作:
+
+1. 使用SessionPool进行查询时,得到的结果集是`SessionDataSet`的封装类`SessionDataSetWrapper`;
+2. 若对于一个查询的结果集,用户并没有遍历完且不再想继续遍历时,需要手动调用释放连接的操作`closeResultSet`;
+3. 若对一个查询的结果集遍历时出现异常,也需要手动调用释放连接的操作`closeResultSet`.
+
+使用示例可以参见 ```session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java```
\ No newline at end of file
diff --git a/docs/Documentation/UserGuide/4-Client/2-Programming - Native API.md b/docs/Documentation/UserGuide/4-Client/2-Programming - Native API.md
index 336f711..c88c45a 100644
--- a/docs/Documentation/UserGuide/4-Client/2-Programming - Native API.md
+++ b/docs/Documentation/UserGuide/4-Client/2-Programming - Native API.md
@@ -124,4 +124,26 @@ Here we show the commonly used interfaces and their parameters in the Native API
To get more information of the following interfaces, please view session/src/main/java/org/apache/iotdb/session/Session.java
-The sample code of using these interfaces is in example/session/src/main/java/org/apache/iotdb/SessionExample.java,which provides an example of how to open an IoTDB session, execute a batch insertion.
\ No newline at end of file
+The sample code of using these interfaces is in example/session/src/main/java/org/apache/iotdb/SessionExample.java,which provides an example of how to open an IoTDB session, execute a batch insertion.
+
+
+# Session Pool for Native API
+
+We provided a connection pool (`SessionPool) for Native API.
+Using the interface, you need to define the pool size.
+
+If you can not get a session connection in 60 secondes, there is a warning log but the program will hang.
+
+If a session has finished an operation, it will be put back to the pool automatically.
+If a session connection is broken, the session will be removed automatically and the pool will try
+to create a new session and redo the operation.
+
+For query operations:
+
+1. When using SessionPool to query data, the result set is `SessionDataSetWrapper`;
+2. Given a `SessionDataSetWrapper`, if you have not scanned all the data in it and stop to use it,
+you have to call `SessionPool.closeResultSet(wrapper)` manually;
+3. When you call `hasNext()` and `next()` of a `SessionDataSetWrapper` and there is an exception, then
+you have to call `SessionPool.closeResultSet(wrapper)` manually;
+
+Examples: ```session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java```
\ No newline at end of file
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 5d7e025..8b7b884 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -224,6 +224,15 @@ public class EnvironmentUtils {
}
}
+ public static void reactiveDaemon() {
+ if (daemon == null) {
+ daemon = new IoTDB();
+ daemon.active();
+ } else {
+ activeDaemon();
+ }
+ }
+
private static void createAllDir() {
// create sequential files
for (String path : directoryManager.getAllSequenceFileFolders()) {
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 2db5923..157d2ec 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -658,4 +658,5 @@ public class Session {
}
}
+
}
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
index aeee91e..311833a 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
@@ -219,10 +219,10 @@ public class SessionDataSet {
TSStatus closeResp = client.closeOperation(closeReq);
RpcUtils.verifySuccess(closeResp);
} catch (IoTDBRPCException e) {
- throw new SQLException("Error occurs for close opeation in server side. The reason is " + e);
+ throw new SQLException("Error occurs for close opeation in server side. The reason is " + e, e);
} catch (TException e) {
throw new SQLException(
- "Error occurs when connecting to server for close operation, because: " + e);
+ "Error occurs when connecting to server for close operation, because: " + e, e);
}
}
}
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java
new file mode 100644
index 0000000..29fa5ec
--- /dev/null
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.pool;
+
+import java.sql.SQLException;
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+public class SessionDataSetWrapper {
+ SessionDataSet sessionDataSet;
+ Session session;
+ SessionPool pool;
+
+ public SessionDataSetWrapper(SessionDataSet sessionDataSet,
+ Session session, SessionPool pool) {
+ this.sessionDataSet = sessionDataSet;
+ this.session = session;
+ this.pool = pool;
+ }
+
+ protected Session getSession() {
+ return session;
+ }
+
+ public int getBatchSize() {
+ return sessionDataSet.getBatchSize();
+ }
+
+ public void setBatchSize(int batchSize) {
+ sessionDataSet.setBatchSize(batchSize);
+ }
+
+ public boolean hasNext() throws SQLException, IoTDBRPCException {
+ boolean next = sessionDataSet.hasNext();
+ if (!next) {
+ pool.closeResultSet(this);
+ }
+ return next;
+ }
+
+ public RowRecord next() throws SQLException, IoTDBRPCException {
+ return sessionDataSet.next();
+ }
+}
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
new file mode 100644
index 0000000..5956dfe
--- /dev/null
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.pool;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.session.Config;
+import org.apache.iotdb.session.IoTDBSessionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.record.RowBatch;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SessionPool is a wrapper of a Session Set.
+ * Using SessionPool, the user do not need to consider how to reuse a session connection.
+ * Even if the session is disconnected, the session pool can recognize it and remove the borken
+ * session connection and create a new one.
+ *
+ * If there is no available connections and the pool reaches its max size, the all methods will hang
+ * until there is a available connection.
+ *
+ * If a user has waited for a session for more than 60 seconds, a warn log will be printed.
+ *
+ *
+ * The only thing you have to remember is that:
+ *
+ * For a query, if you have get all data, i.e., SessionDataSetWrapper.hasNext() == false, it is ok.
+ * Otherwise, i.e., you want to stop the query before you get all data (SessionDataSetWrapper.hasNext() == true),
+ * then you have to call closeResultSet(SessionDataSetWrapper wrapper) manually.
+ * Otherwise the connection is occupied by the query.
+ *
+ * Another case that you have to manually call closeResultSet() is that when there is exception
+ * when you call SessionDataSetWrapper.hasNext() or next()
+ *
+ *
+ */
+public class SessionPool {
+
+ private static final Logger logger = LoggerFactory.getLogger(SessionPool.class);
+ private ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
+ //for session whose resultSet is not released.
+ private ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
+
+
+ public SessionPool(String ip, int port, String user, String password, int maxSize) {
+ this(ip, port, user, password, maxSize, Config.DEFAULT_FETCH_SIZE);
+ }
+
+ public SessionPool(String ip, int port, String user, String password, int maxSize, int fetchSize) {
+ this.maxSize = maxSize;
+ this.ip = ip;
+ this.port = port;
+ this.user = user;
+ this.password = password;
+ this.fetchSize = fetchSize;
+ }
+
+ private int size = 0;
+ private int maxSize = 0;
+ private String ip;
+ private int port;
+ private String user;
+ private String password;
+
+ private int fetchSize;
+
+ //if this method throws an exception, either the server is broken, or the ip/port/user/password is incorrect.
+ private Session getSession() throws IoTDBSessionException {
+ Session session = queue.poll();
+ if (session != null) {
+ return session;
+ } else {
+ synchronized (this) {
+ long start = System.currentTimeMillis();
+ while (session == null) {
+ if (size < maxSize) {
+ //we can create more session
+ size++;
+ //but we do it after skip synchronized block because connection a session is time consuming.
+ break;
+ } else {
+ //we have to wait for someone returns a session.
+ try {
+ this.wait(1000);
+ if (System.currentTimeMillis() - start > 60_000) {
+ logger.warn(
+ "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
+ (System.currentTimeMillis() - start) / 1000, ip, port, user, password);
+ }
+ } catch (InterruptedException e) {
+ logger.error("the SessionPool is damaged", e);
+ Thread.currentThread().interrupt();
+ }
+ session = queue.poll();
+ }
+ }
+ if (session != null) {
+ return session;
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.error("Create a new Session {}, {}, {}, {}", ip, port, user, password);
+ }
+ session = new Session(ip, port, user, password, fetchSize);
+ session.open();
+ return session;
+ }
+ }
+
+ public int currentAvailableSize() {
+ return queue.size();
+ }
+
+ public int currentOccupiedSize() {
+ return occupied.size();
+ }
+
+ private void putBack(Session session) {
+ queue.push(session);
+ synchronized (this) {
+ this.notifyAll();
+ }
+ }
+
+ private void occupy(Session session) {
+ occupied.put(session, session);
+ }
+
+ public void closeResultSet(SessionDataSetWrapper wrapper) throws SQLException {
+ boolean putback = true;
+ try {
+ wrapper.sessionDataSet.closeOperationHandle();
+ } catch (SQLException e) {
+ if (e.getCause() instanceof TException) {
+ // the connection is broken.
+ removeSession();
+ putback = false;
+ } else {
+ throw e;
+ }
+ } finally {
+ Session session = occupied.remove(wrapper.session);
+ if (putback && session != null) {
+ putBack(wrapper.session);
+ }
+ }
+ }
+
+ private synchronized void removeSession() {
+ if (logger.isDebugEnabled()) {
+ logger.error("Remove a broken Session {}, {}, {}, {}", ip, port, user, password);
+ }
+ size--;
+ }
+
+ private void closeSession(Session session) {
+ if (session != null) {
+ try {
+ session.close();
+ } catch (Exception e2) {
+ //do nothing. We just want to guarantee the session is closed.
+ }
+ }
+ }
+
+ /**
+ * use batch interface to insert sorted data times in row batch must be sorted before!
+ *
+ * @param rowBatch data batch
+ */
+ public TSExecuteBatchStatementResp insertSortedBatch(RowBatch rowBatch)
+ throws IoTDBSessionException {
+ while (true) {
+ Session session = getSession();
+ try {
+ TSExecuteBatchStatementResp resp = session.insertSortedBatch(rowBatch);
+ putBack(session);
+ return resp;
+ } catch (IoTDBSessionException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ }
+
+
+
+ /**
+ * use batch interface to insert data
+ *
+ * @param rowBatch data batch
+ */
+ public TSExecuteBatchStatementResp insertBatch(RowBatch rowBatch) throws IoTDBSessionException {
+ while (true) {
+ Session session = getSession();
+ try {
+ TSExecuteBatchStatementResp resp = session.insertBatch(rowBatch);
+ putBack(session);
+ return resp;
+ } catch (IoTDBSessionException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ }
+
+ /**
+ * Insert data in batch format, which can reduce the overhead of network. This method is just like
+ * jdbc batch insert, we pack some insert request in batch and send them to server If you want
+ * improve your performance, please see insertBatch method
+ *
+ * @see Session#insertBatch(RowBatch)
+ */
+ public List<TSStatus> insertInBatch(List<String> deviceIds, List<Long> times,
+ List<List<String>> measurementsList, List<List<String>> valuesList)
+ throws IoTDBSessionException {
+ while (true) {
+ Session session = getSession();
+ try {
+ List<TSStatus> resp = session.insertInBatch(deviceIds, times, measurementsList, valuesList);
+ putBack(session);
+ return resp;
+ } catch (IoTDBSessionException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ }
+
+ /**
+ * insert data in one row, if you want improve your performance, please use insertInBatch method
+ * or insertBatch method
+ *
+ * @see Session#insertInBatch(List, List, List, List)
+ * @see Session#insertBatch(RowBatch)
+ */
+ public TSStatus insert(String deviceId, long time, List<String> measurements, List<String> values)
+ throws IoTDBSessionException {
+ while (true) {
+ Session session = getSession();
+ try {
+ TSStatus resp = session.insert(deviceId, time, measurements, values);
+ putBack(session);
+ return resp;
+ } catch (IoTDBSessionException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ }
+
+ /**
+ * This method NOT insert data into database and the server just return after accept the request,
+ * this method should be used to test other time cost in client
+ */
+ public TSExecuteBatchStatementResp testInsertBatch(RowBatch rowBatch)
+ throws IoTDBSessionException {
+ while (true) {
+ Session session = getSession();
+ try {
+ TSExecuteBatchStatementResp resp = session.testInsertBatch(rowBatch);
+ putBack(session);
+ return resp;
+ } catch (IoTDBSessionException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ }
+
+ /**
+ * This method NOT insert data into database and the server just return after accept the request,
+ * this method should be used to test other time cost in client
+ */
+ public List<TSStatus> testInsertInBatch(List<String> deviceIds, List<Long> times,
+ List<List<String>> measurementsList, List<List<String>> valuesList)
+ throws IoTDBSessionException {
+ while (true) {
+ Session session = getSession();
+ try {
+ List<TSStatus> resp = session
+ .testInsertInBatch(deviceIds, times, measurementsList, valuesList);
+ putBack(session);
+ return resp;
+ } catch (IoTDBSessionException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ }
+
+ /**
+ * This method NOT insert data into database and the server just return after accept the request,
+ * this method should be used to test other time cost in client
+ */
+ public TSStatus testInsert(String deviceId, long time, List<String> measurements,
+ List<String> values) throws IoTDBSessionException {
+ while (true) {
+ Session session = getSession();
+ try {
+ TSStatus resp = session.testInsert(deviceId, time, measurements, values);
+ putBack(session);
+ return resp;
+ } catch (IoTDBSessionException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ }
+
+ /**
+ * delete a timeseries, including data and schema
+ *
+ * @param path timeseries to delete, should be a whole path
+ */
+ public TSStatus deleteTimeseries(String path) throws IoTDBSessionException {
+ while (true) {
+ Session session = getSession();
+ try {
+ TSStatus resp = session.deleteTimeseries(path);
+ putBack(session);
+ return resp;
+ } catch (IoTDBSessionException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ }
+
+ /**
+ * delete a timeseries, including data and schema
+ *
+ * @param paths timeseries to delete, should be a whole path
+ */
+ public TSStatus deleteTimeseries(List<String> paths) throws IoTDBSessionException {
+ while (true) {
+ Session session = getSession();
+ try {
+ TSStatus resp = session.deleteTimeseries(paths);
+ putBack(session);
+ return resp;
+ } catch (IoTDBSessionException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ }
+
+ /**
+ * delete data <= time in one timeseries
+ *
+ * @param path data in which time series to delete
+ * @param time data with time stamp less than or equal to time will be deleted
+ */
+ public TSStatus deleteData(String path, long time) throws IoTDBSessionException {
+ while (true) {
+ Session session = getSession();
+ try {
+ TSStatus resp = session.deleteData(path, time);
+ putBack(session);
+ return resp;
+ } catch (IoTDBSessionException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ }
+
+ /**
+ * delete data <= time in multiple timeseries
+ *
+ * @param paths data in which time series to delete
+ * @param time data with time stamp less than or equal to time will be deleted
+ */
+ public TSStatus deleteData(List<String> paths, long time) throws IoTDBSessionException {
+ while (true) {
+ Session session = getSession();
+ try {
+ TSStatus resp = session.deleteData(paths, time);
+ putBack(session);
+ return resp;
+ } catch (IoTDBSessionException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ }
+
+ public TSStatus setStorageGroup(String storageGroupId) throws IoTDBSessionException {
+ while (true) {
+ Session session = getSession();
+ try {
+ TSStatus resp = session.setStorageGroup(storageGroupId);
+ putBack(session);
+ return resp;
+ } catch (IoTDBSessionException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ }
+
+ public TSStatus deleteStorageGroup(String storageGroup) throws IoTDBSessionException {
+ while (true) {
+ Session session = getSession();
+ try {
+ TSStatus resp = session.deleteStorageGroup(storageGroup);
+ putBack(session);
+ return resp;
+ } catch (IoTDBSessionException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ }
+
+ public TSStatus deleteStorageGroups(List<String> storageGroup) throws IoTDBSessionException {
+ while (true) {
+ Session session = getSession();
+ try {
+ TSStatus resp = session.deleteStorageGroups(storageGroup);
+ putBack(session);
+ return resp;
+ } catch (IoTDBSessionException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ }
+
+ public TSStatus createTimeseries(String path, TSDataType dataType, TSEncoding encoding,
+ CompressionType compressor) throws IoTDBSessionException {
+ while (true) {
+ Session session = getSession();
+ try {
+ TSStatus resp = session.createTimeseries(path, dataType, encoding, compressor);
+ putBack(session);
+ return resp;
+ } catch (IoTDBSessionException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ }
+
+ public boolean checkTimeseriesExists(String path) throws IoTDBSessionException {
+ while (true) {
+ Session session = getSession();
+ try {
+ boolean resp = session.checkTimeseriesExists(path);
+ putBack(session);
+ return resp;
+ } catch (IoTDBSessionException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ }
+
+ /**
+ * execure query sql users must call closeResultSet(SessionDataSetWrapper) if they do not use the
+ * SessionDataSet any more. users do not need to call sessionDataSet.closeOpeationHandler() any
+ * more.
+ *
+ * @param sql query statement
+ * @return result set Notice that you must get the result instance. Otherwise a data leakage will happen
+ */
+ public SessionDataSetWrapper executeQueryStatement(String sql)
+ throws IoTDBRPCException, IoTDBSessionException {
+ while (true) {
+ Session session = getSession();
+ try {
+ SessionDataSet resp = session.executeQueryStatement(sql);
+ SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
+ occupy(session);
+ return wrapper;
+ } catch (TException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } catch (IoTDBRPCException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ }
+
+ /**
+ * execute non query statement
+ *
+ * @param sql non query statement
+ */
+ public void executeNonQueryStatement(String sql)
+ throws IoTDBRPCException, IoTDBSessionException {
+ while (true) {
+ Session session = getSession();
+ try {
+ session.executeNonQueryStatement(sql);
+ putBack(session);
+ return;
+ } catch (TException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } catch (IoTDBRPCException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ }
+}
diff --git a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
new file mode 100644
index 0000000..5114b28
--- /dev/null
+++ b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -0,0 +1,231 @@
+package org.apache.iotdb.session.pool;
+
+import static org.junit.Assert.*;
+
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.session.IoTDBSessionException;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+//this test is not for testing the correctness of Session API. So we just implement one of the API.
+public class SessionPoolTest {
+
+ @Before
+ public void setUp() throws Exception {
+ System.setProperty(IoTDBConstant.IOTDB_CONF, "src/test/resources/");
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.envSetUp();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ }
+
+
+ @Test
+ public void insert() {
+ SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+ ExecutorService service = Executors.newFixedThreadPool(10);
+ for (int i = 0; i < 10; i++) {
+ final int no = i;
+ service.submit(() -> {
+ try {
+ pool.insert("root.sg1.d1", 1, Collections.singletonList("s" + no), Collections.singletonList("3"));
+ } catch (IoTDBSessionException e) {
+ fail();
+ }
+ });
+ }
+ service.shutdown();
+ try {
+ assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+ assertEquals(3, pool.currentAvailableSize());
+ assertEquals(0, pool.currentOccupiedSize());
+ }
+
+ @Test
+ public void incorrectSQL() {
+ SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+ assertEquals(0, pool.currentAvailableSize());
+ try {
+ pool.insert(".root.sg1.d1", 1, Collections.singletonList("s" ), Collections.singletonList("3"));
+ } catch (IoTDBSessionException e) {
+ //do nothing
+ }
+ assertEquals(1, pool.currentAvailableSize());
+ }
+
+
+ @Test
+ public void incorrectExecuteQueryStatement() {
+ SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+ ExecutorService service = Executors.newFixedThreadPool(10);
+ for (int i = 0; i < 10; i++) {
+ try {
+ pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i), Collections.singletonList("" + i));
+ } catch (IoTDBSessionException e) {
+ fail();
+ }
+ }
+ //now let's query
+ for (int i = 0; i < 10; i++) {
+ final int no = i;
+ service.submit(() -> {
+ try {
+ SessionDataSetWrapper wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time = " + no);
+ //this is incorrect becasue wrapper is not closed.
+ //so all other 7 queries will be blocked
+ } catch (IoTDBSessionException e) {
+ fail();
+ } catch (IoTDBRPCException e) {
+ e.printStackTrace();
+ }
+ });
+ }
+ service.shutdown();
+ try {
+ assertFalse(service.awaitTermination(3, TimeUnit.SECONDS));
+ assertEquals(0, pool.currentAvailableSize());
+ assertEquals(3, pool.currentOccupiedSize());
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void executeQueryStatement() {
+ SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+ correctQuery(pool);
+ }
+
+ private void correctQuery(SessionPool pool) {
+ ExecutorService service = Executors.newFixedThreadPool(10);
+ for (int i = 0; i < 10; i++) {
+ try {
+ pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i), Collections.singletonList("" + i));
+ } catch (IoTDBSessionException e) {
+ fail();
+ }
+ }
+ //now let's query
+ for (int i = 0; i < 10; i++) {
+ final int no = i;
+ service.submit(() -> {
+ try {
+ SessionDataSetWrapper wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time = " + no);
+ pool.closeResultSet(wrapper);
+ pool.closeResultSet(wrapper);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ });
+ }
+ service.shutdown();
+ try {
+ assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
+ assertEquals(3, pool.currentAvailableSize());
+ assertEquals(0, pool.currentOccupiedSize());
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void tryIfTheServerIsRestart() {
+ SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1);
+ ExecutorService service = Executors.newFixedThreadPool(10);
+ for (int i = 0; i < 10; i++) {
+ try {
+ pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i), Collections.singletonList("" + i));
+ } catch (IoTDBSessionException e) {
+ fail();
+ }
+ }
+ SessionDataSetWrapper wrapper = null;
+ try {
+ wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time > 1");
+ EnvironmentUtils.stopDaemon();
+ //user does not know what happens.
+ while(wrapper.hasNext()) {
+ wrapper.next();
+ }
+ } catch (IoTDBRPCException e) {
+ e.printStackTrace();
+ fail();
+ } catch (IoTDBSessionException e) {
+ e.printStackTrace();
+ fail();
+ } catch (SQLException e) {
+ if (e.getCause() instanceof TException) {
+ try {
+ pool.closeResultSet(wrapper);
+ } catch (SQLException ex) {
+ ex.printStackTrace();
+ fail();
+ }
+ } else {
+ fail("should be TTransportException but get an exception: " + e.getMessage());
+ }
+ EnvironmentUtils.reactiveDaemon();
+ correctQuery(pool);
+ return;
+ }
+ fail("should throw exception but not");
+ }
+
+ @Test
+ public void tryIfTheServerIsRestartButDataIsGotten() {
+ SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1);
+ ExecutorService service = Executors.newFixedThreadPool(10);
+ for (int i = 0; i < 10; i++) {
+ try {
+ pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i), Collections.singletonList("" + i));
+ } catch (IoTDBSessionException e) {
+ fail();
+ }
+ }
+ assertEquals(1, pool.currentAvailableSize());
+ SessionDataSetWrapper wrapper = null;
+ try {
+ wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time > 1");
+ //user does not know what happens.
+ assertEquals(0, pool.currentAvailableSize());
+ assertEquals(1, pool.currentOccupiedSize());
+ while(wrapper.hasNext()) {
+ wrapper.next();
+ }
+ assertEquals(1, pool.currentAvailableSize());
+ assertEquals(0, pool.currentOccupiedSize());
+ } catch (IoTDBRPCException e) {
+ e.printStackTrace();
+ fail();
+ } catch (IoTDBSessionException e) {
+ e.printStackTrace();
+ fail();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+}
\ No newline at end of file