You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/03/14 02:18:59 UTC
[incubator-iotdb] 01/01: add session pool
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch session_pool
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 19a78a105b8e804fdc22785bdf6efffce0d75f8c
Author: qiaojialin <64...@qq.com>
AuthorDate: Sat Mar 14 10:18:40 2020 +0800
add session pool
---
.../UserGuide/4-Client/3-Programming - Session.md | 18 +-
.../UserGuide/4-Client/3-Programming - Session.md | 24 +-
.../java/org/apache/iotdb/session/Session.java | 14 +-
.../org/apache/iotdb/session/SessionDataSet.java | 11 +-
.../iotdb/session/pool/SessionDataSetWrapper.java | 67 ++++
.../org/apache/iotdb/session/pool/SessionPool.java | 437 +++++++++++++++++++++
.../apache/iotdb/session/pool/SessionPoolTest.java | 295 ++++++++++++++
7 files changed, 860 insertions(+), 6 deletions(-)
diff --git a/docs/Documentation-CHN/UserGuide/4-Client/3-Programming - Session.md b/docs/Documentation-CHN/UserGuide/4-Client/3-Programming - Session.md
index 1db8623..d7aeb1d 100644
--- a/docs/Documentation-CHN/UserGuide/4-Client/3-Programming - Session.md
+++ b/docs/Documentation-CHN/UserGuide/4-Client/3-Programming - Session.md
@@ -103,4 +103,20 @@
浏览上述接口的详细信息,请参阅代码session/src/main/java/org/apache/iotdb/session/Session.java
- 使用上述接口的示例代码在example/session/src/main/java/org/apache/iotdb/SessionExample.java,在此文件中包含了开启session和执行批量插入等操作
\ No newline at end of file
+ 使用上述接口的示例代码在example/session/src/main/java/org/apache/iotdb/SessionExample.java,在此文件中包含了开启session和执行批量插入等操作
+
+ # 针对原生接口的连接池
+
+ 我们提供了一个针对原生接口的连接池(`SessionPool`),使用该接口时,你只需要指定连接池的大小,就可以在使用时从池中获取连接。
+ 如果超过60s都没得到一个连接的话,那么会打印一条警告日志,但是程序仍将继续等待。
+
+ 当一个连接被用完后,他会自动返回池中等待下次被使用;
+ 当一个连接损坏后,他会从池中被删除,并重建一个连接重新执行用户的操作。
+
+ 对于查询操作:
+
+ 1. 使用SessionPool进行查询时,得到的结果集是`SessionDataSet`的封装类`SessionDataSetWrapper`;
+ 2. 若对于一个查询的结果集,用户并没有遍历完且不再想继续遍历时,需要手动调用释放连接的操作`closeResultSet`;
+ 3. 若对一个查询的结果集遍历时出现异常,也需要手动调用释放连接的操作`closeResultSet`.
+
+ 使用示例可以参见 ```session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java```
diff --git a/docs/Documentation/UserGuide/4-Client/3-Programming - Session.md b/docs/Documentation/UserGuide/4-Client/3-Programming - Session.md
index cf446ef..18cd09b 100644
--- a/docs/Documentation/UserGuide/4-Client/3-Programming - Session.md
+++ b/docs/Documentation/UserGuide/4-Client/3-Programming - Session.md
@@ -113,4 +113,26 @@ Here we show the commonly used interfaces and their parameters in the Session:
To get more information of the following interfaces, please view session/src/main/java/org/apache/iotdb/session/Session.java
-The sample code of using these interfaces is in example/session/src/main/java/org/apache/iotdb/SessionExample.java,which provides an example of how to open an IoTDB session, execute a batch insertion.
\ No newline at end of file
+The sample code of using these interfaces is in example/session/src/main/java/org/apache/iotdb/SessionExample.java,which provides an example of how to open an IoTDB session, execute a batch insertion.
+
+
+# Session Pool for Native API
+
+We provided a connection pool (`SessionPool) for Native API.
+Using the interface, you need to define the pool size.
+
+If you can not get a session connection in 60 secondes, there is a warning log but the program will hang.
+
+If a session has finished an operation, it will be put back to the pool automatically.
+If a session connection is broken, the session will be removed automatically and the pool will try
+to create a new session and redo the operation.
+
+For query operations:
+
+1. When using SessionPool to query data, the result set is `SessionDataSetWrapper`;
+2. Given a `SessionDataSetWrapper`, if you have not scanned all the data in it and stop to use it,
+you have to call `SessionPool.closeResultSet(wrapper)` manually;
+3. When you call `hasNext()` and `next()` of a `SessionDataSetWrapper` and there is an exception, then
+you have to call `SessionPool.closeResultSet(wrapper)` manually;
+
+Examples: ```session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java```
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index a3466a2..78791b9 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -59,6 +59,7 @@ public class Session {
private ZoneId zoneId;
private TSOperationHandle operationHandle;
private long statementId;
+ private int fetchSize;
public Session(String host, int port) {
@@ -74,6 +75,15 @@ public class Session {
this.port = port;
this.username = username;
this.password = password;
+ this.fetchSize = 10000;
+ }
+
+ public Session(String host, int port, String username, String password, int fetchSize) {
+ this.host = host;
+ this.port = port;
+ this.username = username;
+ this.password = password;
+ this.fetchSize = fetchSize;
}
public synchronized void open() throws IoTDBSessionException {
@@ -346,8 +356,10 @@ public class Session {
RpcUtils.verifySuccess(execResp.getStatus());
operationHandle = execResp.getOperationHandle();
- return new SessionDataSet(sql, execResp.getColumns(), execResp.getDataTypeList(),
+ SessionDataSet dataSet = new SessionDataSet(sql, execResp.getColumns(), execResp.getDataTypeList(),
operationHandle.getOperationId().getQueryId(), client, operationHandle);
+ dataSet.setBatchSize(fetchSize);
+ return dataSet;
}
/**
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
index fd055d3..5e5ecbe 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
@@ -47,6 +47,7 @@ public class SessionDataSet {
private TSOperationHandle operationHandle;
private int batchSize = 512;
private List<String> columnTypeDeduplicatedList;
+ private List<String> columnNames;
public SessionDataSet(String sql, List<String> columnNameList, List<String> columnTypeList,
long queryId, TSIService.Iface client, TSOperationHandle operationHandle) {
@@ -54,7 +55,7 @@ public class SessionDataSet {
this.queryId = queryId;
this.client = client;
this.operationHandle = operationHandle;
-
+ this.columnNames = columnNameList;
// deduplicate columnTypeList according to columnNameList
this.columnTypeDeduplicatedList = new ArrayList<>();
Set<String> columnSet = new HashSet<>(); // for deduplication
@@ -67,6 +68,10 @@ public class SessionDataSet {
}
}
+ public List<String> getColumnNames() {
+ return columnNames;
+ }
+
public int getBatchSize() {
return batchSize;
}
@@ -127,10 +132,10 @@ public class SessionDataSet {
RpcUtils.verifySuccess(closeResp);
}
} catch (IoTDBRPCException e) {
- throw new SQLException("Error occurs for close opeation in server side. The reason is " + e);
+ throw new SQLException("Error occurs for close opeation in server side. The reason is " + e, e);
} catch (TException e) {
throw new SQLException(
- "Error occurs when connecting to server for close operation, because: " + e);
+ "Error occurs when connecting to server for close operation, because: " + e, e);
}
}
}
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java
new file mode 100644
index 0000000..2f3a067
--- /dev/null
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionDataSetWrapper.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.pool;
+
+import java.sql.SQLException;
+import java.util.List;
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+public class SessionDataSetWrapper {
+ SessionDataSet sessionDataSet;
+ Session session;
+ SessionPool pool;
+
+ public SessionDataSetWrapper(SessionDataSet sessionDataSet,
+ Session session, SessionPool pool) {
+ this.sessionDataSet = sessionDataSet;
+ this.session = session;
+ this.pool = pool;
+ }
+
+ protected Session getSession() {
+ return session;
+ }
+
+ public int getBatchSize() {
+ return sessionDataSet.getBatchSize();
+ }
+
+ public void setBatchSize(int batchSize) {
+ sessionDataSet.setBatchSize(batchSize);
+ }
+
+ public boolean hasNext() throws SQLException, IoTDBRPCException {
+ boolean next = sessionDataSet.hasNext();
+ if (!next) {
+ pool.closeResultSet(this);
+ }
+ return next;
+ }
+
+ public RowRecord next() throws SQLException, IoTDBRPCException {
+ return sessionDataSet.next();
+ }
+
+ public List<String> getColumnNames() {
+ return sessionDataSet.getColumnNames();
+ }
+}
\ No newline at end of file
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
new file mode 100644
index 0000000..9918ad9
--- /dev/null
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.pool;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.session.IoTDBSessionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.record.RowBatch;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SessionPool is a wrapper of a Session Set.
+ * Using SessionPool, the user do not need to consider how to reuse a session connection.
+ * Even if the session is disconnected, the session pool can recognize it and remove the broken
+ * session connection and create a new one.
+ *
+ * If there is no available connections and the pool reaches its max size, the all methods will hang
+ * until there is a available connection.
+ *
+ * If a user has waited for a session for more than 60 seconds, a warn log will be printed.
+ *
+ * The only thing you have to remember is that:
+ *
+ * For a query, if you have get all data, i.e., SessionDataSetWrapper.hasNext() == false, it is ok.
+ * Otherwise, i.e., you want to stop the query before you get all data (SessionDataSetWrapper.hasNext() == true),
+ * then you have to call closeResultSet(SessionDataSetWrapper wrapper) manually.
+ * Otherwise the connection is occupied by the query.
+ *
+ * Another case that you have to manually call closeResultSet() is that when there is exception
+ * when you call SessionDataSetWrapper.hasNext() or next()
+ *
+ */
+public class SessionPool {
+
+ private static final Logger logger = LoggerFactory.getLogger(SessionPool.class);
+ private ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
+ //for session whose resultSet is not released.
+ private ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
+
+ private int size = 0;
+ private int maxSize = 0;
+ private String ip;
+ private int port;
+ private String user;
+ private String password;
+
+ private int fetchSize;
+
+ private long timeout = 60*1000; //ms
+ private static int RETRY = 3;
+
+ public SessionPool(String ip, int port, String user, String password, int maxSize) {
+ this(ip, port, user, password, maxSize, 10000, 60_000);
+ }
+
+ public SessionPool(String ip, int port, String user, String password, int maxSize, int fetchSize, long timeout) {
+ this.maxSize = maxSize;
+ this.ip = ip;
+ this.port = port;
+ this.user = user;
+ this.password = password;
+ this.fetchSize = fetchSize;
+ this.timeout = timeout;
+ }
+
+ //if this method throws an exception, either the server is broken, or the ip/port/user/password is incorrect.
+ //TODO: we can add a mechanism that if the user waits too long time, throw exception.
+ private Session getSession() throws IoTDBSessionException {
+ Session session = queue.poll();
+ if (session != null) {
+ return session;
+ } else {
+ synchronized (this) {
+ long start = System.currentTimeMillis();
+ while (session == null) {
+ if (size < maxSize) {
+ //we can create more session
+ size++;
+ //but we do it after skip synchronized block because connection a session is time consuming.
+ break;
+ } else {
+ //we have to wait for someone returns a session.
+ try {
+ this.wait(1000);
+ if (System.currentTimeMillis() - start > 60_000) {
+ logger.warn(
+ "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
+ (System.currentTimeMillis() - start) / 1000, ip, port, user, password);
+ if (System.currentTimeMillis() - start > timeout) {
+ throw new IoTDBSessionException(String.format("timeout to get a connection from %s:%s", ip, port));
+ }
+ }
+ } catch (InterruptedException e) {
+ logger.error("the SessionPool is damaged", e);
+ Thread.currentThread().interrupt();
+ }
+ session = queue.poll();
+ }
+ }
+ if (session != null) {
+ return session;
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.error("Create a new Session {}, {}, {}, {}", ip, port, user, password);
+ }
+ session = new Session(ip, port, user, password, fetchSize);
+ session.open();
+ return session;
+ }
+ }
+
+ public int currentAvailableSize() {
+ return queue.size();
+ }
+
+ public int currentOccupiedSize() {
+ return occupied.size();
+ }
+
+ private void putBack(Session session) {
+ queue.push(session);
+ synchronized (this) {
+ this.notifyAll();
+ }
+ }
+
+ private void occupy(Session session) {
+ occupied.put(session, session);
+ }
+
+ /**
+ * close all connections in the pool
+ */
+ public synchronized void close() {
+ for (Session session : queue) {
+ try {
+ session.close();
+ } catch (IoTDBSessionException e) {
+ //do nothing
+ }
+ }
+ for (Session session : occupied.keySet()) {
+ try {
+ session.close();
+ } catch (IoTDBSessionException e) {
+ //do nothing
+ }
+ }
+ queue.clear();
+ occupied.clear();
+ }
+
+ public void closeResultSet(SessionDataSetWrapper wrapper) throws SQLException {
+ boolean putback = true;
+ try {
+ wrapper.sessionDataSet.closeOperationHandle();
+ } catch (SQLException e) {
+ if (e.getCause() instanceof TException) {
+ // the connection is broken.
+ removeSession();
+ putback = false;
+ } else {
+ throw e;
+ }
+ } finally {
+ Session session = occupied.remove(wrapper.session);
+ if (putback && session != null) {
+ putBack(wrapper.session);
+ }
+ }
+ }
+
+ private synchronized void removeSession() {
+ if (logger.isDebugEnabled()) {
+ logger.error("Remove a broken Session {}, {}, {}, {}", ip, port, user, password);
+ }
+ size--;
+ }
+
+ private void closeSession(Session session) {
+ if (session != null) {
+ try {
+ session.close();
+ } catch (Exception e2) {
+ //do nothing. We just want to guarantee the session is closed.
+ }
+ }
+ }
+
+ /**
+ * use batch interface to insert data
+ *
+ * @param rowBatch data batch
+ */
+ public TSExecuteBatchStatementResp insertBatch(RowBatch rowBatch) throws IoTDBSessionException {
+ for (int i=0; i< RETRY; i ++){
+ Session session = getSession();
+ try {
+ TSExecuteBatchStatementResp resp = session.insertBatch(rowBatch);
+ putBack(session);
+ return resp;
+ } catch (IoTDBSessionException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ throw new IoTDBSessionException(String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+ }
+
+
+
+ /**
+ * insert data in one row, if you want improve your performance, please use insertInBatch method
+ * or insertBatch method
+ *
+ * @see Session#insertBatch(RowBatch)
+ */
+ public TSStatus insert(String deviceId, long time, List<String> measurements, List<String> values)
+ throws IoTDBSessionException {
+ for (int i=0; i< RETRY; i ++){
+ Session session = getSession();
+ try {
+ TSStatus resp = session.insert(deviceId, time, measurements, values);
+ putBack(session);
+ return resp;
+ } catch (IoTDBSessionException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ throw new IoTDBSessionException(String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+ }
+
+ /**
+ * delete a timeseries, including data and schema
+ *
+ * @param paths timeseries to delete, should be a whole path
+ */
+ public TSStatus deleteTimeseries(List<String> paths) throws IoTDBSessionException {
+ for (int i=0; i< RETRY; i ++){
+ Session session = getSession();
+ try {
+ TSStatus resp = session.deleteTimeseries(paths);
+ putBack(session);
+ return resp;
+ } catch (IoTDBSessionException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ throw new IoTDBSessionException(String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+ }
+
+ /**
+ * delete data <= time in one timeseries
+ *
+ * @param path data in which time series to delete
+ * @param time data with time stamp less than or equal to time will be deleted
+ */
+ public TSStatus deleteData(String path, long time) throws IoTDBSessionException {
+ for (int i=0; i< RETRY; i ++){
+ Session session = getSession();
+ try {
+ TSStatus resp = session.deleteData(path, time);
+ putBack(session);
+ return resp;
+ } catch (IoTDBSessionException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ throw new IoTDBSessionException(String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+ }
+
+ public TSStatus setStorageGroup(String storageGroupId) throws IoTDBSessionException {
+ for (int i=0; i< RETRY; i ++){
+ Session session = getSession();
+ try {
+ TSStatus resp = session.setStorageGroup(storageGroupId);
+ putBack(session);
+ return resp;
+ } catch (IoTDBSessionException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ throw new IoTDBSessionException(String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+ }
+
+ public TSStatus createTimeseries(String path, TSDataType dataType, TSEncoding encoding,
+ CompressionType compressor) throws IoTDBSessionException {
+ for (int i=0; i< RETRY; i ++){
+ Session session = getSession();
+ try {
+ TSStatus resp = session.createTimeseries(path, dataType, encoding, compressor);
+ putBack(session);
+ return resp;
+ } catch (IoTDBSessionException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ throw new IoTDBSessionException(String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+ }
+
+ /**
+ * execure query sql users must call closeResultSet(SessionDataSetWrapper) if they do not use the
+ * SessionDataSet any more. users do not need to call sessionDataSet.closeOpeationHandler() any
+ * more.
+ *
+ * @param sql query statement
+ * @return result set Notice that you must get the result instance. Otherwise a data leakage will happen
+ */
+ public SessionDataSetWrapper executeQueryStatement(String sql)
+ throws IoTDBRPCException, IoTDBSessionException {
+ for (int i=0; i< RETRY; i ++){
+ Session session = getSession();
+ try {
+ SessionDataSet resp = session.executeQueryStatement(sql);
+ SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
+ occupy(session);
+ return wrapper;
+ } catch (TException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } catch (IoTDBRPCException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ throw new IoTDBSessionException(String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+ }
+
+ /**
+ * execute non query statement
+ *
+ * @param sql non query statement
+ */
+ public void executeNonQueryStatement(String sql)
+ throws IoTDBRPCException, IoTDBSessionException {
+ for (int i=0; i< RETRY; i ++){
+ Session session = getSession();
+ try {
+ session.executeNonQueryStatement(sql);
+ putBack(session);
+ return;
+ } catch (TException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } catch (IoTDBRPCException e) {
+ if (e.getCause() instanceof TException) {
+ // TException means the connection is broken, remove it and get a new one.
+ closeSession(session);
+ removeSession();
+ } else {
+ putBack(session);
+ throw e;
+ }
+ }
+ }
+ throw new IoTDBSessionException(String.format("retry to execute statement on %s:%s failed %d times", ip, port, RETRY));
+ }
+}
diff --git a/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
new file mode 100644
index 0000000..440bb7b
--- /dev/null
+++ b/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.pool;
+
+import static org.junit.Assert.*;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.rpc.IoTDBRPCException;
+import org.apache.iotdb.session.IoTDBSessionException;
+import org.apache.iotdb.session.utils.EnvironmentUtils;
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+//this test is not for testing the correctness of Session API. So we just implement one of the API.
+public class SessionPoolTest {
+ IoTDB daemon;
+ @Before
+ public void setUp() throws Exception {
+ System.setProperty(IoTDBConstant.IOTDB_CONF, "src/test/resources/");
+ IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(true);
+ daemon = IoTDB.getInstance();
+ daemon.active();
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.envSetUp();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ daemon.stop();
+ EnvironmentUtils.cleanEnv();
+ }
+
+
+ @Test
+ public void insert() {
+ SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+ ExecutorService service = Executors.newFixedThreadPool(10);
+ try {
+ pool.setStorageGroup("root.sg1");
+ } catch (IoTDBSessionException e) {
+ fail();
+ }
+ for (int i = 0; i < 10; i++) {
+ final int no = i;
+ service.submit(() -> {
+ try {
+ pool.insert("root.sg1.d1", 1, Collections.singletonList("s" + no),
+ Collections.singletonList("3"));
+ } catch (IoTDBSessionException e) {
+ fail();
+ }
+ });
+ }
+ service.shutdown();
+ try {
+ assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+ assertTrue(pool.currentAvailableSize() <= 3);
+ assertEquals(0, pool.currentOccupiedSize());
+ pool.close();
+ }
+
+ @Test
+ public void incorrectSQL() {
+ SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+ assertEquals(0, pool.currentAvailableSize());
+ try {
+ pool.setStorageGroup("root.sg1");
+ } catch (IoTDBSessionException e) {
+ fail();
+ }
+ try {
+ pool.insert(".root.sg1.d1", 1, Collections.singletonList("s"),
+ Collections.singletonList("3"));
+ } catch (IoTDBSessionException e) {
+ //do nothing
+ }
+ assertEquals(1, pool.currentAvailableSize());
+ pool.close();
+ }
+
+
+ @Test
+ public void incorrectExecuteQueryStatement() {
+ SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+ ExecutorService service = Executors.newFixedThreadPool(10);
+ try {
+ pool.setStorageGroup("root.sg1");
+ } catch (IoTDBSessionException e) {
+ fail();
+ }
+ for (int i = 0; i < 10; i++) {
+ try {
+ pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i),
+ Collections.singletonList("" + i));
+ } catch (IoTDBSessionException e) {
+ fail();
+ }
+ }
+ //now let's query
+ for (int i = 0; i < 10; i++) {
+ final int no = i;
+ service.submit(() -> {
+ try {
+ SessionDataSetWrapper wrapper = pool
+ .executeQueryStatement("select * from root.sg1.d1 where time = " + no);
+ //this is incorrect becasue wrapper is not closed.
+ //so all other 7 queries will be blocked
+ } catch (IoTDBSessionException e) {
+ fail();
+ } catch (IoTDBRPCException e) {
+ e.printStackTrace();
+ }
+ });
+ }
+ service.shutdown();
+ try {
+ assertFalse(service.awaitTermination(3, TimeUnit.SECONDS));
+ assertEquals(0, pool.currentAvailableSize());
+ assertTrue(pool.currentOccupiedSize() <= 3);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+ pool.close();
+ }
+
+ @Test
+ public void executeQueryStatement() {
+ SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+ correctQuery(pool);
+ pool.close();
+ }
+
+ private void correctQuery(SessionPool pool) {
+ ExecutorService service = Executors.newFixedThreadPool(10);
+ try {
+ pool.setStorageGroup("root.sg1");
+ } catch (IoTDBSessionException e) {
+ }
+ for (int i = 0; i < 10; i++) {
+ try {
+ pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i),
+ Collections.singletonList("" + i));
+ } catch (IoTDBSessionException e) {
+ fail();
+ }
+ }
+ //now let's query
+ for (int i = 0; i < 10; i++) {
+ final int no = i;
+ service.submit(() -> {
+ try {
+ SessionDataSetWrapper wrapper = pool
+ .executeQueryStatement("select * from root.sg1.d1 where time = " + no);
+ pool.closeResultSet(wrapper);
+ pool.closeResultSet(wrapper);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ });
+ }
+ service.shutdown();
+ try {
+ assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
+ assertTrue(pool.currentAvailableSize() <= 3);
+ assertEquals(0, pool.currentOccupiedSize());
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+// @Test
+ //failed because the server can not be closed.
+ public void tryIfTheServerIsRestart() {
+ SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000);
+ try {
+ pool.setStorageGroup("root.sg1");
+ } catch (IoTDBSessionException e) {
+ fail();
+ }
+ for (int i = 0; i < 10; i++) {
+ try {
+ pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i),
+ Collections.singletonList("" + i));
+ } catch (IoTDBSessionException e) {
+ fail();
+ }
+ }
+ SessionDataSetWrapper wrapper = null;
+ try {
+ wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time > 1");
+ daemon.stop();
+ //user does not know what happens.
+ while (wrapper.hasNext()) {
+ wrapper.next();
+ }
+ } catch (IoTDBRPCException e) {
+ e.printStackTrace();
+ fail();
+ } catch (IoTDBSessionException e) {
+ e.printStackTrace();
+ fail();
+ } catch (SQLException e) {
+ if (e.getCause() instanceof TException) {
+ try {
+ pool.closeResultSet(wrapper);
+ } catch (SQLException ex) {
+ ex.printStackTrace();
+ fail();
+ }
+ } else {
+ fail("should be TTransportException but get an exception: " + e.getMessage());
+ }
+ daemon.active();
+ correctQuery(pool);
+ pool.close();
+ return;
+ }
+ fail("should throw exception but not");
+ }
+
+ //@Test
+ //failed because the server can not be closed.
+ public void tryIfTheServerIsRestartButDataIsGotten() {
+ SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 60000);
+ try {
+ pool.setStorageGroup("root.sg1");
+ } catch (IoTDBSessionException e) {
+ fail();
+ }
+ for (int i = 0; i < 10; i++) {
+ try {
+ pool.insert("root.sg1.d1", i, Collections.singletonList("s" + i),
+ Collections.singletonList("" + i));
+ } catch (IoTDBSessionException e) {
+ fail();
+ }
+ }
+ assertEquals(1, pool.currentAvailableSize());
+ SessionDataSetWrapper wrapper = null;
+ try {
+ wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time > 1");
+ //user does not know what happens.
+ assertEquals(0, pool.currentAvailableSize());
+ assertEquals(1, pool.currentOccupiedSize());
+ while (wrapper.hasNext()) {
+ wrapper.next();
+ }
+ assertEquals(1, pool.currentAvailableSize());
+ assertEquals(0, pool.currentOccupiedSize());
+ } catch (IoTDBRPCException e) {
+ e.printStackTrace();
+ fail();
+ } catch (IoTDBSessionException e) {
+ e.printStackTrace();
+ fail();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail();
+ }
+ pool.close();
+ }
+
+}
\ No newline at end of file