You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/01/25 02:07:34 UTC

[iotdb] branch master updated: [JAVA] Add timeout parameter in Session Level (#2562)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a7d6c6  [JAVA] Add timeout parameter in Session Level (#2562)
4a7d6c6 is described below

commit 4a7d6c6d7e6700a33afb66dc3a1a42cb1c1bb0eb
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Mon Jan 25 10:07:14 2021 +0800

    [JAVA] Add timeout parameter in Session Level (#2562)
---
 .../main/java/org/apache/iotdb/SessionExample.java |  6 ++++
 .../org/apache/iotdb/jdbc/IoTDBConnection.java     | 20 +++++++++--
 .../iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java     |  3 +-
 .../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 42 +++++++++++++++-------
 .../test/java/org/apache/iotdb/jdbc/BatchTest.java |  2 +-
 .../org/apache/iotdb/jdbc/IoTDBConnectionTest.java | 16 ++++++++-
 .../org/apache/iotdb/jdbc/IoTDBStatementTest.java  | 17 ++++++---
 .../java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java |  4 +--
 .../java/org/apache/iotdb/session/Session.java     | 30 ++++++++++++++--
 .../apache/iotdb/session/SessionConnection.java    |  9 ++---
 .../java/org/apache/iotdb/session/SessionUT.java   |  9 +++++
 thrift/src/main/thrift/rpc.thrift                  |  2 +-
 12 files changed, 125 insertions(+), 35 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 1dadd5d..919ea17 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -76,6 +76,7 @@ public class SessionExample {
     queryByIterator();
     deleteData();
     deleteTimeseries();
+    setTimeout();
     session.close();
   }
 
@@ -511,4 +512,9 @@ public class SessionExample {
   private static void nonQuery() throws IoTDBConnectionException, StatementExecutionException {
     session.executeNonQueryStatement("insert into root.sg1.d1(timestamp,s1) values(200, 1);");
   }
+
+  private static void setTimeout() throws StatementExecutionException {
+    Session tempSession = new Session("127.0.0.1", 6667, "root", "root", 10000, 20000);
+    tempSession.setTimeout(60000);
+  }
 }
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
index 0f80755..b024793 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
@@ -71,6 +71,11 @@ public class IoTDBConnection implements Connection {
   private boolean isClosed = true;
   private SQLWarning warningChain = null;
   private TTransport transport;
+  /**
+   * Timeout of query can be set by users. Unit: s
+   * If not set, default value 0 will be used, which will use server configuration.
+   */
+  private int queryTimeout = 0;
   private ZoneId zoneId;
   private boolean autoCommit;
 
@@ -171,7 +176,7 @@ public class IoTDBConnection implements Connection {
     if (isClosed) {
       throw new SQLException("Cannot create statement because connection is closed");
     }
-    return new IoTDBStatement(this, getClient(), sessionId, zoneId);
+    return new IoTDBStatement(this, getClient(), sessionId, zoneId, queryTimeout);
   }
 
   @Override
@@ -186,7 +191,7 @@ public class IoTDBConnection implements Connection {
       throw new SQLException(String.format("Statements with ResultSet type %d are not supported",
           resultSetType));
     }
-    return new IoTDBStatement(this, getClient(), sessionId, zoneId);
+    return new IoTDBStatement(this, getClient(), sessionId, zoneId, queryTimeout);
   }
 
   @Override
@@ -391,6 +396,17 @@ public class IoTDBConnection implements Connection {
     throw new SQLException("Does not support setNetworkTimeout");
   }
 
+  public int getQueryTimeout() {
+    return this.queryTimeout;
+  }
+
+  public void setQueryTimeout(int seconds) throws SQLException {
+    if (seconds < 0) {
+      throw new SQLException(String.format("queryTimeout %d must be >= 0!", seconds));
+    }
+    this.queryTimeout = seconds;
+  }
+
   @Override
   public Savepoint setSavepoint() throws SQLException {
     throw new SQLException("Does not support setSavepoint");
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java
index ad25bd4..6c8cc1c 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java
@@ -110,7 +110,8 @@ public class IoTDBNonAlignJDBCResultSet extends AbstractIoTDBJDBCResultSet {
   protected boolean fetchResults() throws SQLException {
     TSFetchResultsReq req = new TSFetchResultsReq(ioTDBRpcDataSet.sessionId,
         ioTDBRpcDataSet.sql, ioTDBRpcDataSet.fetchSize, ioTDBRpcDataSet.queryId,
-        false, ioTDBRpcDataSet.timeout);
+        false);
+    req.setTimeout(ioTDBRpcDataSet.timeout);
     try {
       TSFetchResultsResp resp = ioTDBRpcDataSet.client.fetchResults(req);
 
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index 7f2e144..4f91808 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -19,6 +19,18 @@
 
 package org.apache.iotdb.jdbc;
 
+import java.nio.ByteBuffer;
+import java.sql.BatchUpdateException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.Statement;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -34,14 +46,6 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.thrift.TException;
 
-import java.nio.ByteBuffer;
-import java.sql.*;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
 public class IoTDBStatement implements Statement {
 
   ZoneId zoneId;
@@ -49,7 +53,11 @@ public class IoTDBStatement implements Statement {
   private IoTDBConnection connection;
   private int fetchSize;
 
-  private int queryTimeout = 60;
+  /**
+   * Timeout of query can be set by users. Unit: s
+   * If not set, default value 0 will be used, which will use server configuration.
+   */
+  private int queryTimeout;
   protected TSIService.Iface client;
   private List<String> batchSQLList;
   private static final String NOT_SUPPORT_EXECUTE = "Not support execute";
@@ -77,31 +85,38 @@ public class IoTDBStatement implements Statement {
    * Constructor of IoTDBStatement.
    */
   IoTDBStatement(IoTDBConnection connection, TSIService.Iface client,
-      long sessionId, int fetchSize, ZoneId zoneId) throws SQLException {
+      long sessionId, int fetchSize, ZoneId zoneId, int seconds) throws SQLException {
     this.connection = connection;
     this.client = client;
     this.sessionId = sessionId;
     this.fetchSize = fetchSize;
     this.batchSQLList = new ArrayList<>();
     this.zoneId = zoneId;
+    this.queryTimeout = seconds;
     requestStmtId();
   }
 
   // only for test
   IoTDBStatement(IoTDBConnection connection, TSIService.Iface client,
-      long sessionId, ZoneId zoneId, long statementId) {
+      long sessionId, ZoneId zoneId, int seconds, long statementId) {
     this.connection = connection;
     this.client = client;
     this.sessionId = sessionId;
     this.fetchSize = Config.DEFAULT_FETCH_SIZE;
     this.batchSQLList = new ArrayList<>();
     this.zoneId = zoneId;
+    this.queryTimeout = seconds;
     this.stmtId = statementId;
   }
 
   IoTDBStatement(IoTDBConnection connection, TSIService.Iface client,
       long sessionId, ZoneId zoneId) throws SQLException {
-    this(connection, client, sessionId, Config.DEFAULT_FETCH_SIZE, zoneId);
+    this(connection, client, sessionId, Config.DEFAULT_FETCH_SIZE, zoneId, 0);
+  }
+
+  IoTDBStatement(IoTDBConnection connection, TSIService.Iface client,
+      long sessionId, ZoneId zoneId, int seconds) throws SQLException {
+    this(connection, client, sessionId, Config.DEFAULT_FETCH_SIZE, zoneId, seconds);
   }
 
   @Override
@@ -224,6 +239,7 @@ public class IoTDBStatement implements Statement {
     isCancelled = false;
     TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, stmtId);
     execReq.setFetchSize(fetchSize);
+    execReq.setTimeout((long) queryTimeout * 1000);
     TSExecuteStatementResp execResp = client.executeStatement(execReq);
     try {
       RpcUtils.verifySuccess(execResp.getStatus());
@@ -300,7 +316,7 @@ public class IoTDBStatement implements Statement {
 
   @Override
   public ResultSet executeQuery(String sql) throws SQLException {
-    return this.executeQuery(sql, 0);
+    return this.executeQuery(sql, (long) queryTimeout * 1000);
   }
 
   public ResultSet executeQuery(String sql, long timeoutInMS) throws SQLException {
diff --git a/jdbc/src/test/java/org/apache/iotdb/jdbc/BatchTest.java b/jdbc/src/test/java/org/apache/iotdb/jdbc/BatchTest.java
index 01d7c9c..dc0fdb3 100644
--- a/jdbc/src/test/java/org/apache/iotdb/jdbc/BatchTest.java
+++ b/jdbc/src/test/java/org/apache/iotdb/jdbc/BatchTest.java
@@ -59,7 +59,7 @@ public class BatchTest {
   public void setUp() throws Exception {
     MockitoAnnotations.initMocks(this);
     when(connection.createStatement())
-        .thenReturn(new IoTDBStatement(connection, client, sessionId, zoneID, 1L));
+        .thenReturn(new IoTDBStatement(connection, client, sessionId, zoneID, 0, 1L));
 
   }
 
diff --git a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBConnectionTest.java b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBConnectionTest.java
index b439117..d0cd709 100644
--- a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBConnectionTest.java
+++ b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBConnectionTest.java
@@ -21,13 +21,21 @@ package org.apache.iotdb.jdbc;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.when;
+
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.service.rpc.thrift.*;
+import org.apache.iotdb.service.rpc.thrift.ServerProperties;
+import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
+import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.thrift.TException;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
@@ -90,4 +98,10 @@ public class IoTDBConnectionTest {
     }
     assertEquals(connection.getServerProperties().getTimestampPrecision(), timestampPrecision);
   }
+
+  @Test
+  public void setTimeoutTest() throws SQLException {
+    connection.setQueryTimeout(60);
+    Assert.assertEquals(60, connection.getQueryTimeout());
+  }
 }
diff --git a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBStatementTest.java b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBStatementTest.java
index e957b17..884cff9 100644
--- a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBStatementTest.java
+++ b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBStatementTest.java
@@ -25,13 +25,11 @@ import static org.mockito.Mockito.when;
 import java.sql.SQLException;
 import java.time.ZoneId;
 import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
 import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
 import org.apache.iotdb.service.rpc.thrift.TSIService.Iface;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
-import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
@@ -55,7 +53,8 @@ public class IoTDBStatementTest {
   @Before
   public void setUp() throws Exception {
     MockitoAnnotations.initMocks(this);
-    when(connection.getMetaData()).thenReturn(new IoTDBDatabaseMetadata(connection, client, sessionId));
+    when(connection.getMetaData())
+        .thenReturn(new IoTDBDatabaseMetadata(connection, client, sessionId));
     when(connection.isClosed()).thenReturn(false);
     when(client.fetchMetadata(any(TSFetchMetadataReq.class))).thenReturn(fetchMetadataResp);
     when(fetchMetadataResp.getStatus()).thenReturn(RpcUtils.SUCCESS_STATUS);
@@ -87,7 +86,7 @@ public class IoTDBStatementTest {
   @Test
   public void testSetFetchSize3() throws SQLException {
     final int fetchSize = 10000;
-    IoTDBStatement stmt = new IoTDBStatement(connection, client, sessionId, fetchSize, zoneID);
+    IoTDBStatement stmt = new IoTDBStatement(connection, client, sessionId, fetchSize, zoneID, 0);
     assertEquals(fetchSize, stmt.getFetchSize());
   }
 
@@ -97,4 +96,12 @@ public class IoTDBStatementTest {
     IoTDBStatement stmt = new IoTDBStatement(connection, client, sessionId, zoneID);
     stmt.setFetchSize(-1);
   }
+
+  @Test
+  public void setTimeoutTest() throws SQLException {
+    IoTDBStatement statement = new IoTDBStatement(connection, client, sessionId, zoneID, 60);
+    Assert.assertEquals(60, statement.getQueryTimeout());
+    statement.setQueryTimeout(100);
+    Assert.assertEquals(100, statement.getQueryTimeout());
+  }
 }
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
index 8ed69c2..1ad9db9 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
@@ -202,8 +202,8 @@ public class IoTDBRpcDataSet {
 
   public boolean fetchResults() throws StatementExecutionException, IoTDBConnectionException {
     rowsIndex = 0;
-    TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId, true,
-        timeout);
+    TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId, true);
+    req.setTimeout(timeout);
     try {
       TSFetchResultsResp resp = client.fetchResults(req);
 
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index fb70a16..b971540 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -66,6 +66,12 @@ public class Session {
   protected String username;
   protected String password;
   protected int fetchSize;
+
+  /**
+   * Timeout of query can be set by users.
+   * If not set, default value 0 will be used, which will use server configuration.
+   */
+  private long timeout = 0;
   protected boolean enableRPCCompression;
   protected int connectionTimeoutInMs;
   protected ZoneId zoneId;
@@ -108,6 +114,14 @@ public class Session {
         Config.DEFAULT_CACHE_LEADER_MODE);
   }
 
+  public Session(String host, int rpcPort, String username, String password, int fetchSize,
+      long timeoutInMs) {
+    this(host, rpcPort, username, password, fetchSize, null,
+        Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE,
+        Config.DEFAULT_CACHE_LEADER_MODE);
+    this.timeout = timeoutInMs;
+  }
+
 
   public Session(String host, int rpcPort, String username, String password, ZoneId zoneId) {
     this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, zoneId,
@@ -313,7 +327,19 @@ public class Session {
 
   public boolean checkTimeseriesExists(String path)
       throws IoTDBConnectionException, StatementExecutionException {
-    return defaultSessionConnection.checkTimeseriesExists(path);
+    return defaultSessionConnection.checkTimeseriesExists(path, timeout);
+  }
+
+
+  public void setTimeout(long timeoutInMs) throws StatementExecutionException {
+    if (timeoutInMs < 0) {
+      throw new StatementExecutionException("Timeout must be >= 0, please check and try again.");
+    }
+    this.timeout = timeoutInMs;
+  }
+
+  public long getTimeout() {
+    return timeout;
   }
 
   /**
@@ -324,7 +350,7 @@ public class Session {
    */
   public SessionDataSet executeQueryStatement(String sql)
       throws StatementExecutionException, IoTDBConnectionException {
-    return defaultSessionConnection.executeQueryStatement(sql);
+    return defaultSessionConnection.executeQueryStatement(sql, timeout);
   }
 
   /**
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index cabd7d5..58c6af1 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -247,11 +247,11 @@ public class SessionConnection {
     }
   }
 
-  protected boolean checkTimeseriesExists(String path)
+  protected boolean checkTimeseriesExists(String path, long timeout)
       throws IoTDBConnectionException, StatementExecutionException {
     SessionDataSet dataSet = null;
     try {
-      dataSet = executeQueryStatement(String.format("SHOW TIMESERIES %s", path));
+      dataSet = executeQueryStatement(String.format("SHOW TIMESERIES %s", path), timeout);
       return dataSet.hasNext();
     } finally {
       if (dataSet != null) {
@@ -260,11 +260,6 @@ public class SessionConnection {
     }
   }
 
-  protected SessionDataSet executeQueryStatement(String sql)
-      throws StatementExecutionException, IoTDBConnectionException {
-    return this.executeQueryStatement(sql, 0);
-  }
-
   protected SessionDataSet executeQueryStatement(String sql, long timeout)
       throws StatementExecutionException, IoTDBConnectionException {
     TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, statementId);
diff --git a/session/src/test/java/org/apache/iotdb/session/SessionUT.java b/session/src/test/java/org/apache/iotdb/session/SessionUT.java
index 24e2ad9..4452181 100644
--- a/session/src/test/java/org/apache/iotdb/session/SessionUT.java
+++ b/session/src/test/java/org/apache/iotdb/session/SessionUT.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -188,4 +189,12 @@ public class SessionUT {
     session.setTimeZone("+09:00");
     assertEquals("+09:00", session.getTimeZone());
   }
+
+  @Test
+  public void setTimeout() throws StatementExecutionException {
+    session = new Session("127.0.0.1", 6667, "root", "root", 10000, 20000);
+    Assert.assertEquals(20000, session.getTimeout());
+    session.setTimeout(60000);
+    Assert.assertEquals(60000, session.getTimeout());
+  }
 }
diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift
index a1a7ea5..83cb70c 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -155,7 +155,7 @@ struct TSFetchResultsReq{
   3: required i32 fetchSize
   4: required i64 queryId
   5: required bool isAlign
-  6: required i64 timeout
+  6: optional i64 timeout
 }
 
 struct TSFetchResultsResp{