You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/03/14 05:18:32 UTC
[incubator-iotdb] branch master updated: [IOTDB-538]add a simple
connection pool for session api (#880)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f9fd91f [IOTDB-538]add a simple connection pool for session api (#880)
f9fd91f is described below
commit f9fd91f80a53c978e32e26c232f24ad9254c9753
Author: Xiangdong Huang <hx...@qq.com>
AuthorDate: Sat Mar 14 13:18:21 2020 +0800
[IOTDB-538]add a simple connection pool for session api (#880)
* add a simple connection pool for session api
---
.../4-Client/2-Programming - Native API.md | 19 +-
.../4-Client/2-Programming - Native API.md | 25 +-
.../main/java/org/apache/iotdb/SessionExample.java | 1 +
.../apache/iotdb/db/utils/EnvironmentUtils.java | 9 +
.../org/apache/iotdb/session/SessionDataSet.java | 10 +-
.../iotdb/session/pool/SessionDataSetWrapper.java | 81 +++
.../org/apache/iotdb/session/pool/SessionPool.java | 669 +++++++++++++++++++++
.../apache/iotdb/session/pool/SessionPoolTest.java | 236 ++++++++
8 files changed, 1047 insertions(+), 3 deletions(-)
diff --git a/docs/Documentation-CHN/UserGuide/4-Client/2-Programming - Native API.md b/docs/Documentation-CHN/UserGuide/4-Client/2-Programming - Native API.md
index 25bb3ee..c434ba0 100644
--- a/docs/Documentation-CHN/UserGuide/4-Client/2-Programming - Native API.md
+++ b/docs/Documentation-CHN/UserGuide/4-Client/2-Programming - Native API.md
@@ -100,4 +100,21 @@
浏览上述接口的详细信息,请参阅代码 ```session/src/main/java/org/apache/iotdb/session/Session.java```
-使用上述接口的示例代码在 ```example/session/src/main/java/org/apache/iotdb/SessionExample.java```
\ No newline at end of file
+使用上述接口的示例代码在 ```example/session/src/main/java/org/apache/iotdb/SessionExample.java```
+
+# 针对原生接口的连接池
+
+我们提供了一个针对原生接口的连接池(`SessionPool`),使用该接口时,你只需要指定连接池的大小,就可以在使用时从池中获取连接。
+如果超过60s都没得到一个连接的话,那么会打印一条警告日志,但是程序仍将继续等待。
+
+当一个连接被用完后,他会自动返回池中等待下次被使用;
+当一个连接损坏后,他会从池中被删除,并重建一个连接重新执行用户的操作。
+
+对于查询操作:
+
+1. 使用SessionPool进行查询时,得到的结果集是`SessionDataSet`的封装类`SessionDataSetWrapper`;
+2. 若对于一个查询的结果集,用户并没有遍历完且不再想继续遍历时,需要手动调用释放连接的操作`closeResultSet`;
+3. 若对一个查询的结果集遍历时出现异常,也需要手动调用释放连接的操作`closeResultSet`.
+4. 可以调用 `SessionDataSetWrapper` 的 `getColumnNames()` 方法得到结果集列名
+
+使用示例可以参见 ```session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java```
\ No newline at end of file
diff --git a/docs/Documentation/UserGuide/4-Client/2-Programming - Native API.md b/docs/Documentation/UserGuide/4-Client/2-Programming - Native API.md
index 336f711..ac72ebe 100644
--- a/docs/Documentation/UserGuide/4-Client/2-Programming - Native API.md
+++ b/docs/Documentation/UserGuide/4-Client/2-Programming - Native API.md
@@ -124,4 +124,27 @@ Here we show the commonly used interfaces and their parameters in the Native API
To get more information of the following interfaces, please view session/src/main/java/org/apache/iotdb/session/Session.java
-The sample code of using these interfaces is in example/session/src/main/java/org/apache/iotdb/SessionExample.java,which provides an example of how to open an IoTDB session, execute a batch insertion.
\ No newline at end of file
+The sample code of using these interfaces is in example/session/src/main/java/org/apache/iotdb/SessionExample.java,which provides an example of how to open an IoTDB session, execute a batch insertion.
+
+
+# Session Pool for Native API
+
+We provided a connection pool (`SessionPool) for Native API.
+Using the interface, you need to define the pool size.
+
+If you can not get a session connection in 60 secondes, there is a warning log but the program will hang.
+
+If a session has finished an operation, it will be put back to the pool automatically.
+If a session connection is broken, the session will be removed automatically and the pool will try
+to create a new session and redo the operation.
+
+For query operations:
+
+1. When using SessionPool to query data, the result set is `SessionDataSetWrapper`;
+2. Given a `SessionDataSetWrapper`, if you have not scanned all the data in it and stop to use it,
+you have to call `SessionPool.closeResultSet(wrapper)` manually;
+3. When you call `hasNext()` and `next()` of a `SessionDataSetWrapper` and there is an exception, then
+you have to call `SessionPool.closeResultSet(wrapper)` manually;
+4. You can call `getColumnNames()` of `SessionDataSetWrapper` to get the column names of query result;
+
+Examples: ```session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java```
\ No newline at end of file
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 280e2b6..5304cde 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -241,6 +241,7 @@ public class SessionExample {
private static void query() throws IoTDBConnectionException, StatementExecutionException {
SessionDataSet dataSet;
dataSet = session.executeQueryStatement("select * from root.sg1.d1");
+ System.out.println(dataSet.getColumnNames());
dataSet.setBatchSize(1024); // default is 512
while (dataSet.hasNext()){
System.out.println(dataSet.next());
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index d3df4a0..f0a1add 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -222,6 +222,15 @@ public class EnvironmentUtils {
}
}
+ public static void reactiveDaemon() {
+ if (daemon == null) {
+ daemon = new IoTDB();
+ daemon.active();
+ } else {
+ activeDaemon();
+ }
+ }
+
private static void createAllDir() {
// create sequential files
for (String path : directoryManager.getAllSequenceFileFolders()) {
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
index cb5540f..763922d 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
@@ -49,6 +49,7 @@ public class SessionDataSet {
private long sessionId;
private TSIService.Iface client;
private int batchSize = 1024;
+ private List<String> columnNameList;
private List<String> columnTypeDeduplicatedList;
// duplicated column index -> origin index
Map<Integer, Integer> duplicateLocation;
@@ -71,6 +72,7 @@ public class SessionDataSet {
this.sql = sql;
this.queryId = queryId;
this.client = client;
+ this.columnNameList = columnNameList;
currentBitmap = new byte[columnNameList.size()];
columnSize = columnNameList.size();
@@ -101,6 +103,10 @@ public class SessionDataSet {
this.batchSize = batchSize;
}
+ public List<String> getColumnNames() {
+ return columnNameList;
+ }
+
public boolean hasNext() throws IoTDBConnectionException, StatementExecutionException {
if (hasCachedRecord) {
return true;
@@ -129,6 +135,8 @@ public class SessionDataSet {
return true;
}
+
+
private void constructOneRow() {
List<Field> outFields = new ArrayList<>();
int loc = 0;
@@ -221,7 +229,7 @@ public class SessionDataSet {
RpcUtils.verifySuccess(closeResp);
} catch (TException e) {
throw new IoTDBConnectionException(
- "Error occurs when connecting to server for close operation, because: " + e);
+ "Error occurs when connecting to server for close operation, because: " + e, e);
}
}
}
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java
new file mode 100644
index 0000000..c1cc7e2
--- /dev/null
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.pool;
+
+import java.util.List;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+public class SessionDataSetWrapper {
+
+ SessionDataSet sessionDataSet;
+ Session session;
+ SessionPool pool;
+
+ public SessionDataSetWrapper(SessionDataSet sessionDataSet,
+ Session session, SessionPool pool) {
+ this.sessionDataSet = sessionDataSet;
+ this.session = session;
+ this.pool = pool;
+ }
+
+ protected Session getSession() {
+ return session;
+ }
+
+ public int getBatchSize() {
+ return sessionDataSet.getBatchSize();
+ }
+
+ public void setBatchSize(int batchSize) {
+ sessionDataSet.setBatchSize(batchSize);
+ }
+
+ /**
+ * If there is an Exception, and you do not want to use the resultset anymore,
+ * you have to release the resultset manually by calling closeResultSet
+ * @return
+ * @throws IoTDBConnectionException
+ * @throws StatementExecutionException
+ */
+ public boolean hasNext() throws IoTDBConnectionException, StatementExecutionException {
+ boolean next = sessionDataSet.hasNext();
+ if (!next) {
+ pool.closeResultSet(this);
+ }
+ return next;
+ }
+ /**
+ * If there is an Exception, and you do not want to use the resultset anymore,
+ * you have to release the resultset manually by calling closeResultSet
+ * @return
+ * @throws IoTDBConnectionException
+ * @throws StatementExecutionException
+ */
+ public RowRecord next() throws IoTDBConnectionException, StatementExecutionException {
+ return sessionDataSet.next();
+ }
+
+ public List<String> getColumnNames() {
+ return sessionDataSet.getColumnNames();
+ }
+}
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
new file mode 100644
index 0000000..1f7b579
--- /dev/null
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -0,0 +1,669 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.pool;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.iotdb.rpc.BatchExecutionException;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.session.Config;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.record.RowBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SessionPool is a wrapper of a Session Set.
+ * Using SessionPool, the user do not need to consider how to reuse a session connection.
+ * Even if the session is disconnected, the session pool can recognize it and remove the broken
+ * session connection and create a new one.
+ *
+ * If there is no available connections and the pool reaches its max size, the all methods will hang
+ * until there is a available connection.
+ *
+ * If a user has waited for a session for more than 60 seconds, a warn log will be printed.
+ *
+ * The only thing you have to remember is that:
+ *
+ * For a query, if you have get all data, i.e., SessionDataSetWrapper.hasNext() == false, it is ok.
+ * Otherwise, i.e., you want to stop the query before you get all data (SessionDataSetWrapper.hasNext() == true),
+ * then you have to call closeResultSet(SessionDataSetWrapper wrapper) manually.
+ * Otherwise the connection is occupied by the query.
+ *
+ * Another case that you have to manually call closeResultSet() is that when there is exception
+ * when you call SessionDataSetWrapper.hasNext() or next()
+ *
+ */
+public class SessionPool {
+
+ private static final Logger logger = LoggerFactory.getLogger(SessionPool.class);
+ private ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
+ //for session whose resultSet is not released.
+ private ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
+
+ private int size = 0;
+ private int maxSize = 0;
+ private String ip;
+ private int port;
+ private String user;
+ private String password;
+
+ private int fetchSize;
+
+ private long timeout; //ms
+ private static int RETRY = 3;
+
+ public SessionPool(String ip, int port, String user, String password, int maxSize) {
+ this(ip, port, user, password, maxSize, Config.DEFAULT_FETCH_SIZE, 60_000);
+ }
+
+ public SessionPool(String ip, int port, String user, String password, int maxSize, int fetchSize,
+ long timeout) {
+ this.maxSize = maxSize;
+ this.ip = ip;
+ this.port = port;
+ this.user = user;
+ this.password = password;
+ this.fetchSize = fetchSize;
+ this.timeout = timeout;
+ }
+
+ //if this method throws an exception, either the server is broken, or the ip/port/user/password is incorrect.
+ //TODO: we can add a mechanism that if the user waits too long time, throw exception.
+ private Session getSession() throws IoTDBConnectionException {
+ Session session = queue.poll();
+ if (session != null) {
+ return session;
+ } else {
+ synchronized (this) {
+ long start = System.currentTimeMillis();
+ while (session == null) {
+ if (size < maxSize) {
+ //we can create more session
+ size++;
+ //but we do it after skip synchronized block because connection a session is time consuming.
+ break;
+ } else {
+ //we have to wait for someone returns a session.
+ try {
+ this.wait(1000);
+ if (System.currentTimeMillis() - start > 60_000) {
+ logger.warn(
+ "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
+ (System.currentTimeMillis() - start) / 1000, ip, port, user, password);
+ if (System.currentTimeMillis() - start > timeout) {
+ throw new IoTDBConnectionException(
+ String.format("timeout to get a connection from %s:%s", ip, port));
+ }
+ }
+ } catch (InterruptedException e) {
+ logger.error("the SessionPool is damaged", e);
+ Thread.currentThread().interrupt();
+ }
+ session = queue.poll();
+ }
+ }
+ if (session != null) {
+ return session;
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.error("Create a new Session {}, {}, {}, {}", ip, port, user, password);
+ }
+ session = new Session(ip, port, user, password, fetchSize);
+ session.open();
+ return session;
+ }
+ }
+
+ public int currentAvailableSize() {
+ return queue.size();
+ }
+
+ public int currentOccupiedSize() {
+ return occupied.size();
+ }
+
+ private void putBack(Session session) {
+ queue.push(session);
+ synchronized (this) {
+ this.notifyAll();
+ }
+ }
+
+ private void occupy(Session session) {
+ occupied.put(session, session);
+ }
+
+ /**
+ * close all connections in the pool
+ */
+ public synchronized void close() {
+ for (Session session : queue) {
+ try {
+ session.close();
+ } catch (IoTDBConnectionException e) {
+ //do nothing
+ }
+ }
+ for (Session session : occupied.keySet()) {
+ try {
+ session.close();
+ } catch (IoTDBConnectionException e) {
+ //do nothing
+ }
+ }
+ queue.clear();
+ occupied.clear();
+ }
+
+ public void closeResultSet(SessionDataSetWrapper wrapper) throws StatementExecutionException {
+ boolean putback = true;
+ try {
+ wrapper.sessionDataSet.closeOperationHandle();
+ } catch (IoTDBConnectionException e) {
+ removeSession();
+ putback = false;
+ } finally {
+ Session session = occupied.remove(wrapper.session);
+ if (putback && session != null) {
+ putBack(wrapper.session);
+ }
+ }
+ }
+
+ private synchronized void removeSession() {
+ if (logger.isDebugEnabled()) {
+ logger.error("Remove a broken Session {}, {}, {}, {}", ip, port, user, password);
+ }
+ size--;
+ }
+
+ private void closeSession(Session session) {
+ if (session != null) {
+ try {
+ session.close();
+ } catch (Exception e2) {
+ //do nothing. We just want to guarantee the session is closed.
+ }
+ }
+ }
+
+ /**
+ * use batch interface to insert sorted data times in row batch must be sorted before!
+ *
+ * @param rowBatch data batch
+ */
+ public void insertSortedBatch(RowBatch rowBatch)
+ throws IoTDBConnectionException, BatchExecutionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ session.insertSortedBatch(rowBatch);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } catch (BatchExecutionException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ throw new IoTDBConnectionException(
+ String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+ }
+
+
+ /**
+ * use batch interface to insert data
+ *
+ * @param rowBatch data batch
+ */
+ public void insertBatch(RowBatch rowBatch)
+ throws IoTDBConnectionException, BatchExecutionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ session.insertBatch(rowBatch);
+ putBack(session);
+ return;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } catch (BatchExecutionException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ throw new IoTDBConnectionException(
+ String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+ }
+
+ /**
+ * Insert data in batch format, which can reduce the overhead of network. This method is just like
+ * jdbc batch insert, we pack some insert request in batch and send them to server If you want
+ * improve your performance, please see insertBatch method
+ *
+ * @see Session#insertBatch(RowBatch)
+ */
+ public void insertInBatch(List<String> deviceIds, List<Long> times,
+ List<List<String>> measurementsList, List<List<String>> valuesList)
+ throws IoTDBConnectionException, BatchExecutionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ session.insertInBatch(deviceIds, times, measurementsList, valuesList);
+ putBack(session);
+ return;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } catch (BatchExecutionException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ throw new IoTDBConnectionException(
+ String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+ }
+
+ /**
+ * insert data in one row, if you want improve your performance, please use insertInBatch method
+ * or insertBatch method
+ *
+ * @see Session#insertInBatch(List, List, List, List)
+ * @see Session#insertBatch(RowBatch)
+ */
+ public TSStatus insert(String deviceId, long time, List<String> measurements, List<String> values)
+ throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ TSStatus resp = session.insert(deviceId, time, measurements, values);
+ putBack(session);
+ return resp;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } catch (StatementExecutionException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ throw new IoTDBConnectionException(
+ String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+ }
+
+ /**
+ * This method NOT insert data into database and the server just return after accept the request,
+ * this method should be used to test other time cost in client
+ */
+ public void testInsertBatch(RowBatch rowBatch)
+ throws IoTDBConnectionException, BatchExecutionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ session.testInsertBatch(rowBatch);
+ putBack(session);
+ return;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } catch (BatchExecutionException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ throw new IoTDBConnectionException(
+ String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+ }
+
+ /**
+ * This method NOT insert data into database and the server just return after accept the request,
+ * this method should be used to test other time cost in client
+ */
+ public void testInsertInBatch(List<String> deviceIds, List<Long> times,
+ List<List<String>> measurementsList, List<List<String>> valuesList)
+ throws IoTDBConnectionException, BatchExecutionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ session
+ .testInsertInBatch(deviceIds, times, measurementsList, valuesList);
+ putBack(session);
+ return;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } catch (BatchExecutionException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ throw new IoTDBConnectionException(
+ String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+ }
+
+ /**
+ * This method NOT insert data into database and the server just return after accept the request,
+ * this method should be used to test other time cost in client
+ */
+ public void testInsert(String deviceId, long time, List<String> measurements,
+ List<String> values) throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ session.testInsert(deviceId, time, measurements, values);
+ putBack(session);
+ return;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } catch (StatementExecutionException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ throw new IoTDBConnectionException(
+ String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+ }
+
+ /**
+ * delete a timeseries, including data and schema
+ *
+ * @param path timeseries to delete, should be a whole path
+ */
+ public void deleteTimeseries(String path)
+ throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ session.deleteTimeseries(path);
+ putBack(session);
+ return;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } catch (StatementExecutionException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ throw new IoTDBConnectionException(
+ String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+ }
+
+ /**
+ * delete a timeseries, including data and schema
+ *
+ * @param paths timeseries to delete, should be a whole path
+ */
+ public void deleteTimeseries(List<String> paths)
+ throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ session.deleteTimeseries(paths);
+ putBack(session);
+ return;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } catch (StatementExecutionException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ throw new IoTDBConnectionException(
+ String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+ }
+
+ /**
+ * delete data <= time in one timeseries
+ *
+ * @param path data in which time series to delete
+ * @param time data with time stamp less than or equal to time will be deleted
+ */
+ public void deleteData(String path, long time)
+ throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ session.deleteData(path, time);
+ putBack(session);
+ return;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } catch (StatementExecutionException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ throw new IoTDBConnectionException(
+ String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+ }
+
+ /**
+ * delete data <= time in multiple timeseries
+ *
+ * @param paths data in which time series to delete
+ * @param time data with time stamp less than or equal to time will be deleted
+ */
+ public void deleteData(List<String> paths, long time)
+ throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ session.deleteData(paths, time);
+ putBack(session);
+ return;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } catch (StatementExecutionException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ throw new IoTDBConnectionException(
+ String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+ }
+
+ public void setStorageGroup(String storageGroupId)
+ throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ session.setStorageGroup(storageGroupId);
+ putBack(session);
+ return;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } catch (StatementExecutionException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ throw new IoTDBConnectionException(
+ String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+ }
+
+ public void deleteStorageGroup(String storageGroup)
+ throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ session.deleteStorageGroup(storageGroup);
+ putBack(session);
+ return;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } catch (StatementExecutionException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ throw new IoTDBConnectionException(
+ String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+ }
+
+ public void deleteStorageGroups(List<String> storageGroup)
+ throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ session.deleteStorageGroups(storageGroup);
+ putBack(session);
+ return;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } catch (StatementExecutionException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ throw new IoTDBConnectionException(
+ String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+ }
+
+ public void createTimeseries(String path, TSDataType dataType, TSEncoding encoding,
+ CompressionType compressor) throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ session.createTimeseries(path, dataType, encoding, compressor);
+ putBack(session);
+ return;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } catch (StatementExecutionException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ throw new IoTDBConnectionException(
+ String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+ }
+
+ public boolean checkTimeseriesExists(String path)
+ throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ boolean resp = session.checkTimeseriesExists(path);
+ putBack(session);
+ return resp;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } catch (StatementExecutionException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ throw new IoTDBConnectionException(
+ String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+ }
+
+ /**
+ * execure query sql users must call closeResultSet(SessionDataSetWrapper) if they do not use the
+ * SessionDataSet any more. users do not need to call sessionDataSet.closeOpeationHandler() any
+ * more.
+ *
+ * @param sql query statement
+ * @return result set Notice that you must get the result instance. Otherwise a data leakage will
+ * happen
+ */
+ public SessionDataSetWrapper executeQueryStatement(String sql)
+ throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ SessionDataSet resp = session.executeQueryStatement(sql);
+ SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
+ occupy(session);
+ return wrapper;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } catch (StatementExecutionException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ throw new IoTDBConnectionException(
+ String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+ }
+
+ /**
+ * execute non query statement
+ *
+ * @param sql non query statement
+ */
+ public void executeNonQueryStatement(String sql)
+ throws StatementExecutionException, IoTDBConnectionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ session.executeNonQueryStatement(sql);
+ putBack(session);
+ return;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } catch (StatementExecutionException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+ throw new IoTDBConnectionException(
+ String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+ }
+}
diff --git a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
new file mode 100644
index 0000000..59dda6b
--- /dev/null
+++ b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.pool;
+
+import static org.junit.Assert.*;
+
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+//this test is not for testing the correctness of Session API. So we just implement one of the API.
+public class SessionPoolTest {
+
+ @Before
+ public void setUp() throws Exception {
+ System.setProperty(IoTDBConstant.IOTDB_CONF, "src/test/resources/");
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.envSetUp();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ }
+
+
+ @Test
+ public void insert() {
+ SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+ ExecutorService service = Executors.newFixedThreadPool(10);
+ for (int i = 0; i < 10; i++) {
+ final int no = i;
+ service.submit(() -> {
+ try {
+ pool.insert("root.sg1.d1", 1, Collections.singletonList("s" + no), Collections.singletonList("3"));
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ fail();
+ }
+ });
+ }
+ service.shutdown();
+ try {
+ assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+ assertTrue(pool.currentAvailableSize() <= 3);
+ assertEquals(0, pool.currentOccupiedSize());
+ pool.close();
+ }
+
+ @Test
+ public void incorrectSQL() {
+ SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+ assertEquals(0, pool.currentAvailableSize());
+ try {
+ pool.insert(".root.sg1.d1", 1, Collections.singletonList("s" ), Collections.singletonList("3"));
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ //do nothing
+ }
+ assertEquals(1, pool.currentAvailableSize());
+ pool.close();
+ }
+
+
+ @Test
+ public void incorrectExecuteQueryStatement() {
+ SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+ ExecutorService service = Executors.newFixedThreadPool(10);
+ for (int i = 0; i < 10; i++) {
+ try {
+ pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i), Collections.singletonList("" + i));
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ fail();
+ }
+ }
+ //now let's query
+ for (int i = 0; i < 10; i++) {
+ final int no = i;
+ service.submit(() -> {
+ try {
+ SessionDataSetWrapper wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time = " + no);
+ //this is incorrect becasue wrapper is not closed.
+ //so all other 7 queries will be blocked
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ fail();
+ }
+ });
+ }
+ service.shutdown();
+ try {
+ assertFalse(service.awaitTermination(3, TimeUnit.SECONDS));
+ assertEquals(0, pool.currentAvailableSize());
+ assertTrue(pool.currentOccupiedSize() <= 3);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+ pool.close();
+ }
+
+ @Test
+ public void executeQueryStatement() {
+ SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+ correctQuery(pool);
+ pool.close();
+ }
+
+ private void correctQuery(SessionPool pool) {
+ ExecutorService service = Executors.newFixedThreadPool(10);
+ for (int i = 0; i < 10; i++) {
+ try {
+ pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i), Collections.singletonList("" + i));
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ fail();
+ }
+ }
+ //now let's query
+ for (int i = 0; i < 10; i++) {
+ final int no = i;
+ service.submit(() -> {
+ try {
+ SessionDataSetWrapper wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time = " + no);
+ pool.closeResultSet(wrapper);
+ pool.closeResultSet(wrapper);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ });
+ }
+ service.shutdown();
+ try {
+ assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
+ assertTrue(pool.currentAvailableSize() <= 3);
+ assertEquals(0, pool.currentOccupiedSize());
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ @Test
+ public void tryIfTheServerIsRestart() {
+ SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1,6000);
+ for (int i = 0; i < 10; i++) {
+ try {
+ pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i), Collections.singletonList("" + i));
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ fail();
+ }
+ }
+ SessionDataSetWrapper wrapper = null;
+ try {
+ wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time > 1");
+ EnvironmentUtils.stopDaemon();
+ //user does not know what happens.
+ while(wrapper.hasNext()) {
+ wrapper.next();
+ }
+ } catch (IoTDBConnectionException e) {
+ try {
+ pool.closeResultSet(wrapper);
+ } catch (StatementExecutionException ex) {
+ ex.printStackTrace();
+ fail();
+ }
+ EnvironmentUtils.reactiveDaemon();
+ correctQuery(pool);
+ pool.close();
+ return;
+ } catch (StatementExecutionException e) {
+ fail("should be TTransportException but get an exception: " + e.getMessage());
+ }
+ fail("should throw exception but not");
+ }
+
+ @Test
+ public void tryIfTheServerIsRestartButDataIsGotten() {
+ SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1,60000);
+ for (int i = 0; i < 10; i++) {
+ try {
+ pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i), Collections.singletonList("" + i));
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ fail();
+ }
+ }
+ assertEquals(1, pool.currentAvailableSize());
+ SessionDataSetWrapper wrapper = null;
+ try {
+ wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time > 1");
+ //user does not know what happens.
+ assertEquals(0, pool.currentAvailableSize());
+ assertEquals(1, pool.currentOccupiedSize());
+ while(wrapper.hasNext()) {
+ wrapper.next();
+ }
+ assertEquals(1, pool.currentAvailableSize());
+ assertEquals(0, pool.currentOccupiedSize());
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ e.printStackTrace();
+ fail();
+ }
+ pool.close();
+ }
+
+}
\ No newline at end of file