You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/01/25 02:07:34 UTC
[iotdb] branch master updated: [JAVA] Add timeout parameter in
Session Level (#2562)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4a7d6c6 [JAVA] Add timeout parameter in Session Level (#2562)
4a7d6c6 is described below
commit 4a7d6c6d7e6700a33afb66dc3a1a42cb1c1bb0eb
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Mon Jan 25 10:07:14 2021 +0800
[JAVA] Add timeout parameter in Session Level (#2562)
---
.../main/java/org/apache/iotdb/SessionExample.java | 6 ++++
.../org/apache/iotdb/jdbc/IoTDBConnection.java | 20 +++++++++--
.../iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java | 3 +-
.../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 42 +++++++++++++++-------
.../test/java/org/apache/iotdb/jdbc/BatchTest.java | 2 +-
.../org/apache/iotdb/jdbc/IoTDBConnectionTest.java | 16 ++++++++-
.../org/apache/iotdb/jdbc/IoTDBStatementTest.java | 17 ++++++---
.../java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java | 4 +--
.../java/org/apache/iotdb/session/Session.java | 30 ++++++++++++++--
.../apache/iotdb/session/SessionConnection.java | 9 ++---
.../java/org/apache/iotdb/session/SessionUT.java | 9 +++++
thrift/src/main/thrift/rpc.thrift | 2 +-
12 files changed, 125 insertions(+), 35 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 1dadd5d..919ea17 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -76,6 +76,7 @@ public class SessionExample {
queryByIterator();
deleteData();
deleteTimeseries();
+ setTimeout();
session.close();
}
@@ -511,4 +512,9 @@ public class SessionExample {
private static void nonQuery() throws IoTDBConnectionException, StatementExecutionException {
session.executeNonQueryStatement("insert into root.sg1.d1(timestamp,s1) values(200, 1);");
}
+
+ private static void setTimeout() throws StatementExecutionException {
+ Session tempSession = new Session("127.0.0.1", 6667, "root", "root", 10000, 20000);
+ tempSession.setTimeout(60000);
+ }
}
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
index 0f80755..b024793 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
@@ -71,6 +71,11 @@ public class IoTDBConnection implements Connection {
private boolean isClosed = true;
private SQLWarning warningChain = null;
private TTransport transport;
+ /**
+ * Timeout of query can be set by users. Unit: s
+ * If not set, default value 0 will be used, which will use server configuration.
+ */
+ private int queryTimeout = 0;
private ZoneId zoneId;
private boolean autoCommit;
@@ -171,7 +176,7 @@ public class IoTDBConnection implements Connection {
if (isClosed) {
throw new SQLException("Cannot create statement because connection is closed");
}
- return new IoTDBStatement(this, getClient(), sessionId, zoneId);
+ return new IoTDBStatement(this, getClient(), sessionId, zoneId, queryTimeout);
}
@Override
@@ -186,7 +191,7 @@ public class IoTDBConnection implements Connection {
throw new SQLException(String.format("Statements with ResultSet type %d are not supported",
resultSetType));
}
- return new IoTDBStatement(this, getClient(), sessionId, zoneId);
+ return new IoTDBStatement(this, getClient(), sessionId, zoneId, queryTimeout);
}
@Override
@@ -391,6 +396,17 @@ public class IoTDBConnection implements Connection {
throw new SQLException("Does not support setNetworkTimeout");
}
+ public int getQueryTimeout() {
+ return this.queryTimeout;
+ }
+
+ public void setQueryTimeout(int seconds) throws SQLException {
+ if (seconds < 0) {
+ throw new SQLException(String.format("queryTimeout %d must be >= 0!", seconds));
+ }
+ this.queryTimeout = seconds;
+ }
+
@Override
public Savepoint setSavepoint() throws SQLException {
throw new SQLException("Does not support setSavepoint");
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java
index ad25bd4..6c8cc1c 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java
@@ -110,7 +110,8 @@ public class IoTDBNonAlignJDBCResultSet extends AbstractIoTDBJDBCResultSet {
protected boolean fetchResults() throws SQLException {
TSFetchResultsReq req = new TSFetchResultsReq(ioTDBRpcDataSet.sessionId,
ioTDBRpcDataSet.sql, ioTDBRpcDataSet.fetchSize, ioTDBRpcDataSet.queryId,
- false, ioTDBRpcDataSet.timeout);
+ false);
+ req.setTimeout(ioTDBRpcDataSet.timeout);
try {
TSFetchResultsResp resp = ioTDBRpcDataSet.client.fetchResults(req);
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index 7f2e144..4f91808 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -19,6 +19,18 @@
package org.apache.iotdb.jdbc;
+import java.nio.ByteBuffer;
+import java.sql.BatchUpdateException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.Statement;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -34,14 +46,6 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.thrift.TException;
-import java.nio.ByteBuffer;
-import java.sql.*;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
public class IoTDBStatement implements Statement {
ZoneId zoneId;
@@ -49,7 +53,11 @@ public class IoTDBStatement implements Statement {
private IoTDBConnection connection;
private int fetchSize;
- private int queryTimeout = 60;
+ /**
+ * Timeout of query can be set by users. Unit: s
+ * If not set, default value 0 will be used, which will use server configuration.
+ */
+ private int queryTimeout;
protected TSIService.Iface client;
private List<String> batchSQLList;
private static final String NOT_SUPPORT_EXECUTE = "Not support execute";
@@ -77,31 +85,38 @@ public class IoTDBStatement implements Statement {
* Constructor of IoTDBStatement.
*/
IoTDBStatement(IoTDBConnection connection, TSIService.Iface client,
- long sessionId, int fetchSize, ZoneId zoneId) throws SQLException {
+ long sessionId, int fetchSize, ZoneId zoneId, int seconds) throws SQLException {
this.connection = connection;
this.client = client;
this.sessionId = sessionId;
this.fetchSize = fetchSize;
this.batchSQLList = new ArrayList<>();
this.zoneId = zoneId;
+ this.queryTimeout = seconds;
requestStmtId();
}
// only for test
IoTDBStatement(IoTDBConnection connection, TSIService.Iface client,
- long sessionId, ZoneId zoneId, long statementId) {
+ long sessionId, ZoneId zoneId, int seconds, long statementId) {
this.connection = connection;
this.client = client;
this.sessionId = sessionId;
this.fetchSize = Config.DEFAULT_FETCH_SIZE;
this.batchSQLList = new ArrayList<>();
this.zoneId = zoneId;
+ this.queryTimeout = seconds;
this.stmtId = statementId;
}
IoTDBStatement(IoTDBConnection connection, TSIService.Iface client,
long sessionId, ZoneId zoneId) throws SQLException {
- this(connection, client, sessionId, Config.DEFAULT_FETCH_SIZE, zoneId);
+ this(connection, client, sessionId, Config.DEFAULT_FETCH_SIZE, zoneId, 0);
+ }
+
+ IoTDBStatement(IoTDBConnection connection, TSIService.Iface client,
+ long sessionId, ZoneId zoneId, int seconds) throws SQLException {
+ this(connection, client, sessionId, Config.DEFAULT_FETCH_SIZE, zoneId, seconds);
}
@Override
@@ -224,6 +239,7 @@ public class IoTDBStatement implements Statement {
isCancelled = false;
TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, stmtId);
execReq.setFetchSize(fetchSize);
+ execReq.setTimeout((long) queryTimeout * 1000);
TSExecuteStatementResp execResp = client.executeStatement(execReq);
try {
RpcUtils.verifySuccess(execResp.getStatus());
@@ -300,7 +316,7 @@ public class IoTDBStatement implements Statement {
@Override
public ResultSet executeQuery(String sql) throws SQLException {
- return this.executeQuery(sql, 0);
+ return this.executeQuery(sql, (long) queryTimeout * 1000);
}
public ResultSet executeQuery(String sql, long timeoutInMS) throws SQLException {
diff --git a/jdbc/src/test/java/org/apache/iotdb/jdbc/BatchTest.java b/jdbc/src/test/java/org/apache/iotdb/jdbc/BatchTest.java
index 01d7c9c..dc0fdb3 100644
--- a/jdbc/src/test/java/org/apache/iotdb/jdbc/BatchTest.java
+++ b/jdbc/src/test/java/org/apache/iotdb/jdbc/BatchTest.java
@@ -59,7 +59,7 @@ public class BatchTest {
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
when(connection.createStatement())
- .thenReturn(new IoTDBStatement(connection, client, sessionId, zoneID, 1L));
+ .thenReturn(new IoTDBStatement(connection, client, sessionId, zoneID, 0, 1L));
}
diff --git a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBConnectionTest.java b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBConnectionTest.java
index b439117..d0cd709 100644
--- a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBConnectionTest.java
+++ b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBConnectionTest.java
@@ -21,13 +21,21 @@ package org.apache.iotdb.jdbc;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.when;
+
+import java.sql.SQLException;
+import java.sql.Statement;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.service.rpc.thrift.*;
+import org.apache.iotdb.service.rpc.thrift.ServerProperties;
+import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
+import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.thrift.TException;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
@@ -90,4 +98,10 @@ public class IoTDBConnectionTest {
}
assertEquals(connection.getServerProperties().getTimestampPrecision(), timestampPrecision);
}
+
+ @Test
+ public void setTimeoutTest() throws SQLException {
+ connection.setQueryTimeout(60);
+ Assert.assertEquals(60, connection.getQueryTimeout());
+ }
}
diff --git a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBStatementTest.java b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBStatementTest.java
index e957b17..884cff9 100644
--- a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBStatementTest.java
+++ b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBStatementTest.java
@@ -25,13 +25,11 @@ import static org.mockito.Mockito.when;
import java.sql.SQLException;
import java.time.ZoneId;
import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
import org.apache.iotdb.service.rpc.thrift.TSIService.Iface;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
@@ -55,7 +53,8 @@ public class IoTDBStatementTest {
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
- when(connection.getMetaData()).thenReturn(new IoTDBDatabaseMetadata(connection, client, sessionId));
+ when(connection.getMetaData())
+ .thenReturn(new IoTDBDatabaseMetadata(connection, client, sessionId));
when(connection.isClosed()).thenReturn(false);
when(client.fetchMetadata(any(TSFetchMetadataReq.class))).thenReturn(fetchMetadataResp);
when(fetchMetadataResp.getStatus()).thenReturn(RpcUtils.SUCCESS_STATUS);
@@ -87,7 +86,7 @@ public class IoTDBStatementTest {
@Test
public void testSetFetchSize3() throws SQLException {
final int fetchSize = 10000;
- IoTDBStatement stmt = new IoTDBStatement(connection, client, sessionId, fetchSize, zoneID);
+ IoTDBStatement stmt = new IoTDBStatement(connection, client, sessionId, fetchSize, zoneID, 0);
assertEquals(fetchSize, stmt.getFetchSize());
}
@@ -97,4 +96,12 @@ public class IoTDBStatementTest {
IoTDBStatement stmt = new IoTDBStatement(connection, client, sessionId, zoneID);
stmt.setFetchSize(-1);
}
+
+ @Test
+ public void setTimeoutTest() throws SQLException {
+ IoTDBStatement statement = new IoTDBStatement(connection, client, sessionId, zoneID, 60);
+ Assert.assertEquals(60, statement.getQueryTimeout());
+ statement.setQueryTimeout(100);
+ Assert.assertEquals(100, statement.getQueryTimeout());
+ }
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
index 8ed69c2..1ad9db9 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
@@ -202,8 +202,8 @@ public class IoTDBRpcDataSet {
public boolean fetchResults() throws StatementExecutionException, IoTDBConnectionException {
rowsIndex = 0;
- TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId, true,
- timeout);
+ TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId, true);
+ req.setTimeout(timeout);
try {
TSFetchResultsResp resp = client.fetchResults(req);
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index fb70a16..b971540 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -66,6 +66,12 @@ public class Session {
protected String username;
protected String password;
protected int fetchSize;
+
+ /**
+ * Timeout of query can be set by users.
+ * If not set, default value 0 will be used, which will use server configuration.
+ */
+ private long timeout = 0;
protected boolean enableRPCCompression;
protected int connectionTimeoutInMs;
protected ZoneId zoneId;
@@ -108,6 +114,14 @@ public class Session {
Config.DEFAULT_CACHE_LEADER_MODE);
}
+ public Session(String host, int rpcPort, String username, String password, int fetchSize,
+ long timeoutInMs) {
+ this(host, rpcPort, username, password, fetchSize, null,
+ Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE,
+ Config.DEFAULT_CACHE_LEADER_MODE);
+ this.timeout = timeoutInMs;
+ }
+
public Session(String host, int rpcPort, String username, String password, ZoneId zoneId) {
this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, zoneId,
@@ -313,7 +327,19 @@ public class Session {
public boolean checkTimeseriesExists(String path)
throws IoTDBConnectionException, StatementExecutionException {
- return defaultSessionConnection.checkTimeseriesExists(path);
+ return defaultSessionConnection.checkTimeseriesExists(path, timeout);
+ }
+
+
+ public void setTimeout(long timeoutInMs) throws StatementExecutionException {
+ if (timeoutInMs < 0) {
+ throw new StatementExecutionException("Timeout must be >= 0, please check and try again.");
+ }
+ this.timeout = timeoutInMs;
+ }
+
+ public long getTimeout() {
+ return timeout;
}
/**
@@ -324,7 +350,7 @@ public class Session {
*/
public SessionDataSet executeQueryStatement(String sql)
throws StatementExecutionException, IoTDBConnectionException {
- return defaultSessionConnection.executeQueryStatement(sql);
+ return defaultSessionConnection.executeQueryStatement(sql, timeout);
}
/**
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index cabd7d5..58c6af1 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -247,11 +247,11 @@ public class SessionConnection {
}
}
- protected boolean checkTimeseriesExists(String path)
+ protected boolean checkTimeseriesExists(String path, long timeout)
throws IoTDBConnectionException, StatementExecutionException {
SessionDataSet dataSet = null;
try {
- dataSet = executeQueryStatement(String.format("SHOW TIMESERIES %s", path));
+ dataSet = executeQueryStatement(String.format("SHOW TIMESERIES %s", path), timeout);
return dataSet.hasNext();
} finally {
if (dataSet != null) {
@@ -260,11 +260,6 @@ public class SessionConnection {
}
}
- protected SessionDataSet executeQueryStatement(String sql)
- throws StatementExecutionException, IoTDBConnectionException {
- return this.executeQueryStatement(sql, 0);
- }
-
protected SessionDataSet executeQueryStatement(String sql, long timeout)
throws StatementExecutionException, IoTDBConnectionException {
TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, statementId);
diff --git a/session/src/test/java/org/apache/iotdb/session/SessionUT.java b/session/src/test/java/org/apache/iotdb/session/SessionUT.java
index 24e2ad9..4452181 100644
--- a/session/src/test/java/org/apache/iotdb/session/SessionUT.java
+++ b/session/src/test/java/org/apache/iotdb/session/SessionUT.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -188,4 +189,12 @@ public class SessionUT {
session.setTimeZone("+09:00");
assertEquals("+09:00", session.getTimeZone());
}
+
+ @Test
+ public void setTimeout() throws StatementExecutionException {
+ session = new Session("127.0.0.1", 6667, "root", "root", 10000, 20000);
+ Assert.assertEquals(20000, session.getTimeout());
+ session.setTimeout(60000);
+ Assert.assertEquals(60000, session.getTimeout());
+ }
}
diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift
index a1a7ea5..83cb70c 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -155,7 +155,7 @@ struct TSFetchResultsReq{
3: required i32 fetchSize
4: required i64 queryId
5: required bool isAlign
- 6: required i64 timeout
+ 6: optional i64 timeout
}
struct TSFetchResultsResp{