You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/01/13 10:37:36 UTC
[iotdb] branch master updated: [IOTDB-965] Add timeout in query
(#2352)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 652be96 [IOTDB-965] Add timeout in query (#2352)
652be96 is described below
commit 652be96969c8b35415f3dea51feb37be9108dad3
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Wed Jan 13 18:37:17 2021 +0800
[IOTDB-965] Add timeout in query (#2352)
---
.../antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4 | 15 ++
docs/UserGuide/Client/Status Codes.md | 2 +
docs/zh/UserGuide/Client/Status Codes.md | 2 +
.../main/java/org/apache/iotdb/SessionExample.java | 20 ++-
.../iotdb/jdbc/AbstractIoTDBJDBCResultSet.java | 4 +-
.../org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java | 4 +-
.../iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java | 6 +-
.../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 29 +++-
.../resources/conf/iotdb-engine.properties | 3 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 ++
.../org/apache/iotdb/db/conf/IoTDBConstant.java | 3 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 3 +
.../org/apache/iotdb/db/engine/StorageEngine.java | 11 --
.../db/exception/QueryIdNotExsitException.java | 26 +---
.../query/QueryTimeoutRuntimeException.java | 32 ++--
.../org/apache/iotdb/db/mqtt/PublishHandler.java | 123 +++++++--------
.../main/java/org/apache/iotdb/db/qp/Planner.java | 1 +
.../apache/iotdb/db/qp/constant/SQLConstant.java | 2 +
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 58 ++++++-
.../org/apache/iotdb/db/qp/logical/Operator.java | 2 +-
.../iotdb/db/qp/logical/sys/KillQueryOperator.java | 37 ++---
.../iotdb/db/qp/physical/sys/KillQueryPlan.java | 39 ++---
.../apache/iotdb/db/qp/physical/sys/ShowPlan.java | 2 +-
.../qp/physical/sys/ShowQueryProcesslistPlan.java | 26 +---
.../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java | 19 +++
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 11 +-
.../db/query/control/QueryResourceManager.java | 3 +
.../iotdb/db/query/control/QueryTimeManager.java | 172 +++++++++++++++++++++
.../db/query/dataset/NonAlignEngineDataSet.java | 22 +++
.../dataset/RawQueryDataSetWithoutValueFilter.java | 39 ++++-
.../db/query/executor/RawDataQueryExecutor.java | 4 +-
.../iotdb/db/query/reader/series/SeriesReader.java | 14 ++
.../org/apache/iotdb/db/service/ServiceType.java | 1 +
.../org/apache/iotdb/db/service/TSServiceImpl.java | 62 ++++++--
.../iotdb/db/integration/IOTDBGroupByIT.java | 1 -
.../iotdb/db/integration/IoTDBKillQueryTest.java | 84 ++++++++++
.../db/integration/IoTDBQueryTimeoutTest.java | 154 ++++++++++++++++++
.../java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java | 7 +-
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../main/java/org/apache/iotdb/session/Config.java | 3 +-
.../java/org/apache/iotdb/session/Session.java | 21 ++-
.../apache/iotdb/session/SessionConnection.java | 9 +-
.../org/apache/iotdb/session/SessionDataSet.java | 12 +-
.../iotdb/session/IoTDBSessionIteratorIT.java | 29 ++++
thrift/src/main/thrift/rpc.thrift | 3 +
.../exception/QueryTimeoutRuntimeException.java | 32 ++--
.../iotdb/tsfile/read/reader/LocalTsFileInput.java | 20 +++
47 files changed, 955 insertions(+), 231 deletions(-)
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4
index 849ae49..aa5e874 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4
@@ -79,6 +79,8 @@ statement
| SHOW CHILD PATHS prefixPath? #showChildPaths
| SHOW DEVICES prefixPath? limitClause? #showDevices
| SHOW MERGE #showMergeStatus
+ | SHOW QUERY PROCESSLIST #showQueryProcesslist
+ | KILL QUERY INT? #killQuery
| TRACING ON #tracingOn
| TRACING OFF #tracingOff
| COUNT TIMESERIES prefixPath? (GROUP BY LEVEL OPERATOR_EQ INT)? #countTimeseries
@@ -714,6 +716,19 @@ SHOW
: S H O W
;
+QUERY
+ : Q U E R Y
+ ;
+
+KILL
+ : K I L L
+ ;
+
+PROCESSLIST
+ : P R O C E S S L I S T
+ ;
+
+
GRANT
: G R A N T
;
diff --git a/docs/UserGuide/Client/Status Codes.md b/docs/UserGuide/Client/Status Codes.md
index 59f7ae9..62f15cb 100644
--- a/docs/UserGuide/Client/Status Codes.md
+++ b/docs/UserGuide/Client/Status Codes.md
@@ -80,6 +80,8 @@ Here is a list of Status Code and related message:
|410|PATH_ERROR|Path related error|
|411|QUERY_PROCESS_ERROR|Query process related error|
|412|WRITE_PROCESS_ERROR|Writing data related error|
+|413|WRITE_PROCESS_REJECT|Writing data rejected error|
+|414|QUERY_ID_NOT_EXIST|Kill query with non existent queryId|
|500|INTERNAL_SERVER_ERROR|Internal server error|
|501|CLOSE_OPERATION_ERROR|Meet error in close operation|
|502|READ_ONLY_SYSTEM_ERROR|Operating system is read only|
diff --git a/docs/zh/UserGuide/Client/Status Codes.md b/docs/zh/UserGuide/Client/Status Codes.md
index e881ec7..dbe552f 100644
--- a/docs/zh/UserGuide/Client/Status Codes.md
+++ b/docs/zh/UserGuide/Client/Status Codes.md
@@ -80,6 +80,8 @@ try {
|410|PATH_ERROR|路径相关错误|
|411|QUERY_PROCESS_ERROR|查询处理相关错误|
|412|WRITE_PROCESS_ERROR|写入相关错误|
+|413|WRITE_PROCESS_REJECT|写入拒绝错误|
+|414|QUERY_ID_NOT_EXIST|Query id 不存在|
|500|INTERNAL_SERVER_ERROR|服务器内部错误|
|501|CLOSE_OPERATION_ERROR|关闭操作错误|
|502|READ_ONLY_SYSTEM_ERROR|系统只读|
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 1ab093f..d57d478 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -58,8 +58,9 @@ public class SessionExample {
try {
session.setStorageGroup("root.sg1");
} catch (StatementExecutionException e) {
- if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode())
+ if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
throw e;
+ }
}
createTimeseries();
@@ -70,6 +71,7 @@ public class SessionExample {
insertRecords();
nonQuery();
query();
+ queryWithTimeout();
rawDataQuery();
queryByIterator();
deleteData();
@@ -178,7 +180,8 @@ public class SessionExample {
}
}
- private static void insertStrRecord() throws IoTDBConnectionException, StatementExecutionException {
+ private static void insertStrRecord()
+ throws IoTDBConnectionException, StatementExecutionException {
String deviceId = ROOT_SG1_D1;
List<String> measurements = new ArrayList<>();
measurements.add("s1");
@@ -249,6 +252,7 @@ public class SessionExample {
session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
}
+
/**
* insert the data of a device. For each timestamp, the number of measurements is the same.
*
@@ -431,6 +435,18 @@ public class SessionExample {
dataSet.closeOperationHandle();
}
+ private static void queryWithTimeout()
+ throws IoTDBConnectionException, StatementExecutionException {
+ SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg1.d1", 2000);
+ System.out.println(dataSet.getColumnNames());
+ dataSet.setFetchSize(1024); // default is 10000
+ while (dataSet.hasNext()) {
+ System.out.println(dataSet.next());
+ }
+
+ dataSet.closeOperationHandle();
+ }
+
private static void rawDataQuery() throws IoTDBConnectionException, StatementExecutionException {
List<String> paths = new ArrayList<>();
paths.add(ROOT_SG1_D1_S1);
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/AbstractIoTDBJDBCResultSet.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/AbstractIoTDBJDBCResultSet.java
index 99c300f..259b24b 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/AbstractIoTDBJDBCResultSet.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/AbstractIoTDBJDBCResultSet.java
@@ -58,11 +58,11 @@ public abstract class AbstractIoTDBJDBCResultSet implements ResultSet {
public AbstractIoTDBJDBCResultSet(Statement statement, List<String> columnNameList,
List<String> columnTypeList, Map<String, Integer> columnNameIndex, boolean ignoreTimeStamp,
TSIService.Iface client,
- String sql, long queryId, long sessionId)
+ String sql, long queryId, long sessionId, long timeout)
throws SQLException {
this.ioTDBRpcDataSet = new IoTDBRpcDataSet(sql, columnNameList, columnTypeList,
columnNameIndex, ignoreTimeStamp, queryId, client, sessionId, null,
- statement.getFetchSize());
+ statement.getFetchSize(), timeout);
this.statement = statement;
this.columnTypeList = columnTypeList;
}
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java
index cda5048..84f28ec 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java
@@ -33,10 +33,10 @@ public class IoTDBJDBCResultSet extends AbstractIoTDBJDBCResultSet {
public IoTDBJDBCResultSet(Statement statement, List<String> columnNameList,
List<String> columnTypeList, Map<String, Integer> columnNameIndex, boolean ignoreTimeStamp,
TSIService.Iface client,
- String sql, long queryId, long sessionId, TSQueryDataSet dataset)
+ String sql, long queryId, long sessionId, TSQueryDataSet dataset, long timeout)
throws SQLException {
super(statement, columnNameList, columnTypeList, columnNameIndex, ignoreTimeStamp, client, sql,
- queryId, sessionId);
+ queryId, sessionId, timeout);
ioTDBRpcDataSet.setTsQueryDataSet(dataset);
}
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java
index dd0b513..ad25bd4 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java
@@ -52,10 +52,10 @@ public class IoTDBNonAlignJDBCResultSet extends AbstractIoTDBJDBCResultSet {
IoTDBNonAlignJDBCResultSet(Statement statement, List<String> columnNameList,
List<String> columnTypeList, Map<String, Integer> columnNameIndex, boolean ignoreTimeStamp,
TSIService.Iface client,
- String sql, long queryId, long sessionId, TSQueryNonAlignDataSet dataset)
+ String sql, long queryId, long sessionId, TSQueryNonAlignDataSet dataset, long timeout)
throws SQLException {
super(statement, columnNameList, columnTypeList, columnNameIndex, ignoreTimeStamp, client, sql,
- queryId, sessionId);
+ queryId, sessionId, timeout);
times = new byte[columnNameList.size()][Long.BYTES];
@@ -110,7 +110,7 @@ public class IoTDBNonAlignJDBCResultSet extends AbstractIoTDBJDBCResultSet {
protected boolean fetchResults() throws SQLException {
TSFetchResultsReq req = new TSFetchResultsReq(ioTDBRpcDataSet.sessionId,
ioTDBRpcDataSet.sql, ioTDBRpcDataSet.fetchSize, ioTDBRpcDataSet.queryId,
- false);
+ false, ioTDBRpcDataSet.timeout);
try {
TSFetchResultsResp resp = ioTDBRpcDataSet.client.fetchResults(req);
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index d7cc9ec..8dab6e5 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -48,7 +48,11 @@ public class IoTDBStatement implements Statement {
private ResultSet resultSet = null;
private IoTDBConnection connection;
private int fetchSize;
- private int queryTimeout = 10;
+
+ /**
+ * query timeout with seconds unit
+ */
+ private int queryTimeout = 60;
protected TSIService.Iface client;
private List<String> batchSQLList;
private static final String NOT_SUPPORT_EXECUTE = "Not support execute";
@@ -223,6 +227,7 @@ public class IoTDBStatement implements Statement {
isCancelled = false;
TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, stmtId);
execReq.setFetchSize(fetchSize);
+ execReq.setTimeout((long) queryTimeout * 1000);
TSExecuteStatementResp execResp = client.executeStatement(execReq);
try {
RpcUtils.verifySuccess(execResp.getStatus());
@@ -236,11 +241,11 @@ public class IoTDBStatement implements Statement {
if (execResp.queryDataSet == null) {
this.resultSet = new IoTDBNonAlignJDBCResultSet(this, execResp.getColumns(),
execResp.getDataTypeList(), execResp.columnNameIndexMap, execResp.ignoreTimeStamp,
- client, sql, queryId, sessionId, execResp.nonAlignQueryDataSet);
+ client, sql, queryId, sessionId, execResp.nonAlignQueryDataSet, execReq.timeout);
} else {
this.resultSet = new IoTDBJDBCResultSet(this, execResp.getColumns(),
execResp.getDataTypeList(), execResp.columnNameIndexMap, execResp.ignoreTimeStamp,
- client, sql, queryId, sessionId, execResp.queryDataSet);
+ client, sql, queryId, sessionId, execResp.queryDataSet, execReq.timeout);
}
return true;
}
@@ -299,14 +304,21 @@ public class IoTDBStatement implements Statement {
@Override
public ResultSet executeQuery(String sql) throws SQLException {
+ return this.executeQuery(sql, (long) this.queryTimeout * 1000);
+ }
+
+ public ResultSet executeQuery(String sql, long timeoutInMS) throws SQLException {
checkConnection("execute query");
+ if (timeoutInMS <= 0) {
+ throw new SQLException("Timeout must be over 0, please check and try again.");
+ }
isClosed = false;
try {
- return executeQuerySQL(sql);
+ return executeQuerySQL(sql, timeoutInMS);
} catch (TException e) {
if (reConnect()) {
try {
- return executeQuerySQL(sql);
+ return executeQuerySQL(sql, timeoutInMS);
} catch (TException e2) {
throw new SQLException(
"Fail to executeQuery " + sql + "after reconnecting. please check server status", e2);
@@ -319,10 +331,11 @@ public class IoTDBStatement implements Statement {
}
}
- private ResultSet executeQuerySQL(String sql) throws TException, SQLException {
+ private ResultSet executeQuerySQL(String sql, long timeoutInMS) throws TException, SQLException {
isCancelled = false;
TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, stmtId);
execReq.setFetchSize(fetchSize);
+ execReq.setTimeout(timeoutInMS);
TSExecuteStatementResp execResp = client.executeQueryStatement(execReq);
queryId = execResp.getQueryId();
try {
@@ -338,11 +351,11 @@ public class IoTDBStatement implements Statement {
if (execResp.queryDataSet == null) {
this.resultSet = new IoTDBNonAlignJDBCResultSet(this, execResp.getColumns(),
execResp.getDataTypeList(), execResp.columnNameIndexMap, execResp.ignoreTimeStamp, client,
- sql, queryId, sessionId, execResp.nonAlignQueryDataSet);
+ sql, queryId, sessionId, execResp.nonAlignQueryDataSet, execReq.timeout);
} else {
this.resultSet = new IoTDBJDBCResultSet(this, execResp.getColumns(),
execResp.getDataTypeList(), execResp.columnNameIndexMap, execResp.ignoreTimeStamp, client,
- sql, queryId, sessionId, execResp.queryDataSet);
+ sql, queryId, sessionId, execResp.queryDataSet, execReq.timeout);
}
return resultSet;
}
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index fbbba13..8201e9d 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -371,6 +371,9 @@ compaction_thread_num=10
# The limit of write throughput merge can reach per second
merge_write_throughput_mb_per_sec=8
+# The max executing time of query. unit: ms
+query_time_threshold=60000
+
####################
### Metadata Cache Configuration
####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index d0918f1..0871744 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -477,6 +477,11 @@ public class IoTDBConfig {
private long cacheFileReaderClearPeriod = 100000;
/**
+ * the max executing time of query in ms.
+ */
+ private int queryTimeThreshold = 60000;
+
+ /**
* Replace implementation class of JDBC service
*/
private String rpcImplClassName = TSServiceImpl.class.getName();
@@ -1314,6 +1319,14 @@ public class IoTDBConfig {
this.cacheFileReaderClearPeriod = cacheFileReaderClearPeriod;
}
+ public int getQueryTimeThreshold() {
+ return queryTimeThreshold;
+ }
+
+ public void setQueryTimeThreshold(int queryTimeThreshold) {
+ this.queryTimeThreshold = queryTimeThreshold;
+ }
+
public boolean isReadOnly() {
return readOnly;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index edd8607..aac1541 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -72,6 +72,9 @@ public class IoTDBConstant {
public static final String COLUMN_COUNT = "count";
public static final String COLUMN_TAGS = "tags";
public static final String COLUMN_ATTRIBUTES = "attributes";
+ public static final String QUERY_ID = "queryId";
+ public static final String START_TIME = "startTime";
+ public static final String STATEMENT = "statement";
public static final String COLUMN_ROLE = "role";
public static final String COLUMN_USER = "user";
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 460ae25..b11f9d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -320,6 +320,9 @@ public class IoTDBDescriptor {
.getProperty("unseq_file_num_in_each_level",
Integer.toString(conf.getUnseqFileNumInEachLevel()))));
+ conf.setQueryTimeThreshold(Integer.parseInt(properties
+ .getProperty("query_time_threshold", Integer.toString(conf.getQueryTimeThreshold()))));
+
conf.setSyncEnable(Boolean
.parseBoolean(properties.getProperty("is_sync_enable",
Boolean.toString(conf.isSyncEnable()))));
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 6349e21..30d5d52 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -83,7 +83,6 @@ import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
@@ -394,7 +393,6 @@ public class StorageEngine implements IService {
StorageGroupProcessor storageGroupProcessor = getProcessor(insertRowPlan.getDeviceId());
- // TODO monitor: update statistics
try {
storageGroupProcessor.insert(insertRowPlan);
if (config.isEnableStatMonitor()) {
@@ -430,7 +428,6 @@ public class StorageEngine implements IService {
+ "failed", insertTabletPlan.getDeviceId()), e);
}
- // TODO monitor: update statistics
storageGroupProcessor.insertTablet(insertTabletPlan);
if (config.isEnableStatMonitor()) {
updateMonitorStatistics(storageGroupProcessor, insertTabletPlan);
@@ -544,14 +541,6 @@ public class StorageEngine implements IService {
}
}
- /**
- * update data.
- */
- public void update(String deviceId, String measurementId, long startTime, long endTime,
- TSDataType type, String v) {
- // TODO
- }
-
public void delete(PartialPath path, long startTime, long endTime, long planIndex)
throws StorageEngineException {
try {
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java b/server/src/main/java/org/apache/iotdb/db/exception/QueryIdNotExsitException.java
similarity index 50%
copy from session/src/main/java/org/apache/iotdb/session/Config.java
copy to server/src/main/java/org/apache/iotdb/db/exception/QueryIdNotExsitException.java
index d900df7..0505722 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/QueryIdNotExsitException.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -16,26 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.session;
+package org.apache.iotdb.db.exception;
-public class Config {
+import org.apache.iotdb.rpc.TSStatusCode;
- public static final String DEFAULT_USER = "root";
- public static final String DEFAULT_PASSWORD = "root";
- public static final int DEFAULT_FETCH_SIZE = 10000;
- public static final int DEFAULT_TIMEOUT_MS = 0;
- public static final boolean DEFAULT_CACHE_LEADER_MODE = true;
+public class QueryIdNotExsitException extends IoTDBException {
- public static final int RETRY_NUM = 3;
- public static final long RETRY_INTERVAL_MS = 1000;
+ public QueryIdNotExsitException(String message) {
+ super(message, TSStatusCode.QUERY_ID_NOT_EXIST.getStatusCode());
+ }
- /**
- * thrift init buffer size, 1KB by default
- */
- public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
-
- /**
- * thrift max frame size (16384000 bytes by default), we change it to 64MB
- */
- public static final int DEFAULT_MAX_FRAME_SIZE = 67108864;
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java b/server/src/main/java/org/apache/iotdb/db/exception/query/QueryTimeoutRuntimeException.java
similarity index 50%
copy from session/src/main/java/org/apache/iotdb/session/Config.java
copy to server/src/main/java/org/apache/iotdb/db/exception/query/QueryTimeoutRuntimeException.java
index d900df7..d8ba143 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/query/QueryTimeoutRuntimeException.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -16,26 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.session;
+package org.apache.iotdb.db.exception.query;
-public class Config {
+/**
+ * This class is used to throw run time exception when query is time out.
+ */
+public class QueryTimeoutRuntimeException extends RuntimeException {
- public static final String DEFAULT_USER = "root";
- public static final String DEFAULT_PASSWORD = "root";
- public static final int DEFAULT_FETCH_SIZE = 10000;
- public static final int DEFAULT_TIMEOUT_MS = 0;
- public static final boolean DEFAULT_CACHE_LEADER_MODE = true;
+ public static final String TIMEOUT_EXCEPTION_MESSAGE
+ = "Current query is time out, please check your statement or modify timeout parameter.";
- public static final int RETRY_NUM = 3;
- public static final long RETRY_INTERVAL_MS = 1000;
+ public QueryTimeoutRuntimeException(String message, Throwable cause) {
+ super(message, cause);
+ }
- /**
- * thrift init buffer size, 1KB by default
- */
- public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
+ public QueryTimeoutRuntimeException(String message) {
+ super(message);
+ }
- /**
- * thrift max frame size (16384000 bytes by default), we change it to 64MB
- */
- public static final int DEFAULT_MAX_FRAME_SIZE = 67108864;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java b/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
index 87a2adf..f2f4ea9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
@@ -25,7 +25,6 @@ import java.util.List;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
@@ -41,79 +40,81 @@ import org.slf4j.LoggerFactory;
* PublishHandler handle the messages from MQTT clients.
*/
public class PublishHandler extends AbstractInterceptHandler {
- private static final Logger LOG = LoggerFactory.getLogger(PublishHandler.class);
- private IPlanExecutor executor;
- private PayloadFormatter payloadFormat;
+ private static final Logger LOG = LoggerFactory.getLogger(PublishHandler.class);
- public PublishHandler(IoTDBConfig config) {
- this.payloadFormat = PayloadFormatManager.getPayloadFormat(config.getMqttPayloadFormatter());
- try {
- this.executor = new PlanExecutor();
- } catch (QueryProcessException e) {
- throw new RuntimeException(e);
- }
- }
+ private IPlanExecutor executor;
+ private PayloadFormatter payloadFormat;
- protected PublishHandler(IPlanExecutor executor, PayloadFormatter payloadFormat) {
- this.executor = executor;
- this.payloadFormat = payloadFormat;
+ public PublishHandler(IoTDBConfig config) {
+ this.payloadFormat = PayloadFormatManager.getPayloadFormat(config.getMqttPayloadFormatter());
+ try {
+ this.executor = new PlanExecutor();
+ } catch (QueryProcessException e) {
+ throw new RuntimeException(e);
}
+ }
- @Override
- public String getID() {
- return "iotdb-mqtt-broker-listener";
- }
+ protected PublishHandler(IPlanExecutor executor, PayloadFormatter payloadFormat) {
+ this.executor = executor;
+ this.payloadFormat = payloadFormat;
+ }
- @Override
- public void onPublish(InterceptPublishMessage msg) {
- String clientId = msg.getClientID();
- ByteBuf payload = msg.getPayload();
- String topic = msg.getTopicName();
- String username = msg.getUsername();
- MqttQoS qos = msg.getQos();
+ @Override
+ public String getID() {
+ return "iotdb-mqtt-broker-listener";
+ }
- LOG.debug("Receive publish message. clientId: {}, username: {}, qos: {}, topic: {}, payload: {}",
- clientId, username, qos, topic, payload);
+ @Override
+ public void onPublish(InterceptPublishMessage msg) {
+ String clientId = msg.getClientID();
+ ByteBuf payload = msg.getPayload();
+ String topic = msg.getTopicName();
+ String username = msg.getUsername();
+ MqttQoS qos = msg.getQos();
- List<Message> events = payloadFormat.format(payload);
- if (events == null) {
- return;
- }
+ LOG.debug(
+ "Receive publish message. clientId: {}, username: {}, qos: {}, topic: {}, payload: {}",
+ clientId, username, qos, topic, payload);
+
+ List<Message> events = payloadFormat.format(payload);
+ if (events == null) {
+ return;
+ }
- // since device ids from messages maybe different, so we use the InsertPlan not InsertTabletPlan.
- for (Message event : events) {
- if (event == null) {
- continue;
- }
+ // since device ids from messages maybe different, so we use the InsertPlan not InsertTabletPlan.
+ for (Message event : events) {
+ if (event == null) {
+ continue;
+ }
- InsertRowPlan plan = new InsertRowPlan();
- plan.setTime(event.getTimestamp());
- plan.setMeasurements(event.getMeasurements().toArray(new String[0]));
- plan.setValues(event.getValues().toArray(new Object[0]));
- plan.setDataTypes(new TSDataType[event.getValues().size()]);
- plan.setNeedInferType(true);
+ InsertRowPlan plan = new InsertRowPlan();
+ plan.setTime(event.getTimestamp());
+ plan.setMeasurements(event.getMeasurements().toArray(new String[0]));
+ plan.setValues(event.getValues().toArray(new Object[0]));
+ plan.setDataTypes(new TSDataType[event.getValues().size()]);
+ plan.setNeedInferType(true);
- boolean status = false;
- try {
- plan.setDeviceId(new PartialPath(event.getDevice()));
- status = executeNonQuery(plan);
- } catch (Exception e) {
- LOG.warn(
- "meet error when inserting device {}, measurements {}, at time {}, because ",
- event.getDevice(), event.getMeasurements(), event.getTimestamp(), e);
- }
+ boolean status = false;
+ try {
+ plan.setDeviceId(new PartialPath(event.getDevice()));
+ status = executeNonQuery(plan);
+ } catch (Exception e) {
+ LOG.warn(
+ "meet error when inserting device {}, measurements {}, at time {}, because ",
+ event.getDevice(), event.getMeasurements(), event.getTimestamp(), e);
+ }
- LOG.debug("event process result: {}", status);
- }
+ LOG.debug("event process result: {}", status);
}
+ }
- private boolean executeNonQuery(PhysicalPlan plan)
- throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
- if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
- throw new QueryProcessException(
- "Current system mode is read-only, does not support non-query operation");
- }
- return executor.processNonQuery(plan);
+ private boolean executeNonQuery(PhysicalPlan plan)
+ throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
+ if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
+ throw new QueryProcessException(
+ "Current system mode is read-only, does not support non-query operation");
}
+ return executor.processNonQuery(plan);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
index d6cd44f..57f3b46 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
@@ -174,6 +174,7 @@ public class Planner {
case SHOW_MERGE_STATUS:
case DELETE_PARTITION:
case CREATE_SCHEMA_SNAPSHOT:
+ case KILL:
case CREATE_FUNCTION:
case DROP_FUNCTION:
return operator;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
index ecbc667..11776ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
@@ -167,6 +167,8 @@ public class SQLConstant {
public static final int TOK_COUNT_DEVICES = 95;
public static final int TOK_COUNT_STORAGE_GROUP = 96;
+ public static final int TOK_QUERY_PROCESSLIST = 97;
+ public static final int TOK_KILL_QUERY = 98;
public static final Map<Integer, String> tokenSymbol = new HashMap<>();
public static final Map<Integer, String> tokenNames = new HashMap<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 2a0d7cb..b31d5aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -39,9 +39,11 @@ import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_USER;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_VALUE;
import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDAF;
import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDTF;
-import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_NATIVE;
import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDAF;
import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDTF;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_NATIVE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.QUERY_ID;
+import static org.apache.iotdb.db.conf.IoTDBConstant.STATEMENT;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
import java.io.File;
@@ -77,6 +79,7 @@ import org.apache.iotdb.db.engine.merge.manage.MergeManager.TaskStatus;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.BatchProcessException;
+import org.apache.iotdb.db.exception.QueryIdNotExsitException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.UDFRegistrationException;
import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
@@ -122,6 +125,7 @@ import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.DropFunctionPlan;
import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
+import org.apache.iotdb.db.qp.physical.sys.KillQueryPlan;
import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
import org.apache.iotdb.db.qp.physical.sys.MergePlan;
import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan;
@@ -136,6 +140,8 @@ import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.TracingPlan;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryTimeManager;
+import org.apache.iotdb.db.query.control.QueryTimeManager.QueryInfo;
import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
import org.apache.iotdb.db.query.dataset.ListDataSet;
import org.apache.iotdb.db.query.dataset.ShowDevicesDataSet;
@@ -296,6 +302,13 @@ public class PlanExecutor implements IPlanExecutor {
throw new QueryProcessException("Create index hasn't been supported yet");
case DROP_INDEX:
throw new QueryProcessException("Drop index hasn't been supported yet");
+ case KILL:
+ try {
+ operateKillQuery((KillQueryPlan) plan);
+ } catch (QueryIdNotExsitException e) {
+ throw new QueryProcessException(e.getMessage());
+ }
+ return true;
default:
throw new UnsupportedOperationException(
String.format("operation %s is not supported", plan.getOperatorType()));
@@ -332,6 +345,32 @@ public class PlanExecutor implements IPlanExecutor {
IoTDB.metaManager.createMTreeSnapshot();
}
+ private void operateKillQuery(KillQueryPlan killQueryPlan) throws QueryIdNotExsitException {
+ QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
+ long killQueryId = killQueryPlan.getQueryId();
+ if (killQueryId != -1) {
+ if (queryTimeManager.getQueryInfoMap().get(killQueryId) != null) {
+ queryTimeManager.getQueryInfoMap().computeIfPresent(killQueryId, (k, v) -> {
+ queryTimeManager.killQuery(k);
+ return null;
+ });
+ } else {
+ throw new QueryIdNotExsitException(String
+ .format("Query Id %d is not exist, please check it.", killQueryPlan.getQueryId()));
+ }
+ } else {
+ // if queryId is not specified, kill all running queries
+ if (!queryTimeManager.getQueryInfoMap().isEmpty()) {
+ synchronized (queryTimeManager.getQueryInfoMap()) {
+ List<Long> queryIdList = new ArrayList<>(queryTimeManager.getQueryInfoMap().keySet());
+ for (Long queryId : queryIdList) {
+ queryTimeManager.killQuery(queryId);
+ }
+ }
+ }
+ }
+ }
+
private void operateTracing(TracingPlan plan) {
IoTDBDescriptor.getInstance().getConfig().setEnablePerformanceTracing(plan.isTracingOn());
}
@@ -455,6 +494,8 @@ public class PlanExecutor implements IPlanExecutor {
return processCountNodes((CountPlan) showPlan);
case MERGE_STATUS:
return processShowMergeStatus();
+ case QUERY_PROCESSLIST:
+ return processShowQueryProcesslist();
case FUNCTIONS:
return processShowFunctions((ShowFunctionsPlan) showPlan);
default:
@@ -1558,6 +1599,21 @@ public class PlanExecutor implements IPlanExecutor {
return record;
}
+ private QueryDataSet processShowQueryProcesslist() {
+ ListDataSet listDataSet = new ListDataSet(Arrays
+ .asList(new PartialPath(QUERY_ID, false), new PartialPath(STATEMENT, false)),
+ Arrays.asList(TSDataType.INT64, TSDataType.TEXT));
+ QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
+ for (Entry<Long, QueryInfo> queryInfo : queryTimeManager.getQueryInfoMap()
+ .entrySet()) {
+ RowRecord record = new RowRecord(queryInfo.getValue().getStartTime());
+ record.addField(queryInfo.getKey(), TSDataType.INT64);
+ record.addField(new Binary(queryInfo.getValue().getStatement()), TSDataType.TEXT);
+ listDataSet.putRecord(record);
+ }
+ return listDataSet;
+ }
+
/**
* @param storageGroups the storage groups to check
* @return List<PartialPath> the storage groups that not exist
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index fad59aa..6c2e49d 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -78,7 +78,7 @@ public abstract class Operator {
ALTER_TIMESERIES, FLUSH, MERGE, FULL_MERGE, CLEAR_CACHE,
SHOW_MERGE_STATUS, CREATE_SCHEMA_SNAPSHOT, TRACING, DELETE_PARTITION,
UDAF, UDTF, CREATE_FUNCTION, DROP_FUNCTION,
- CREATE_MULTI_TIMESERIES, CREATE_INDEX, DROP_INDEX, QUERY_INDEX,
+ CREATE_MULTI_TIMESERIES, CREATE_INDEX, DROP_INDEX, QUERY_INDEX, KILL,
CHANGE_TAG_OFFSET, CHANGE_ALIAS, MNODE,
MEASUREMENT_MNODE, STORAGE_GROUP_MNODE,
BATCH_INSERT_ONE_DEVICE;
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/KillQueryOperator.java
similarity index 50%
copy from session/src/main/java/org/apache/iotdb/session/Config.java
copy to server/src/main/java/org/apache/iotdb/db/qp/logical/sys/KillQueryOperator.java
index d900df7..7634566 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/KillQueryOperator.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -16,26 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.session;
+package org.apache.iotdb.db.qp.logical.sys;
-public class Config {
+import org.apache.iotdb.db.qp.logical.RootOperator;
- public static final String DEFAULT_USER = "root";
- public static final String DEFAULT_PASSWORD = "root";
- public static final int DEFAULT_FETCH_SIZE = 10000;
- public static final int DEFAULT_TIMEOUT_MS = 0;
- public static final boolean DEFAULT_CACHE_LEADER_MODE = true;
+public class KillQueryOperator extends RootOperator {
+ long queryId = -1;
- public static final int RETRY_NUM = 3;
- public static final long RETRY_INTERVAL_MS = 1000;
+ public KillQueryOperator(int tokenIntType) {
+ this(tokenIntType, OperatorType.KILL);
+ }
- /**
- * thrift init buffer size, 1KB by default
- */
- public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
+ public KillQueryOperator(int tokenIntType, OperatorType operatorType) {
+ super(tokenIntType);
+ this.operatorType = operatorType;
+ }
- /**
- * thrift max frame size (16384000 bytes by default), we change it to 64MB
- */
- public static final int DEFAULT_MAX_FRAME_SIZE = 67108864;
+ public void setQueryId(long queryId) {
+ this.queryId = queryId;
+ }
+
+ public long getQueryId() {
+ return queryId;
+ }
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/KillQueryPlan.java
similarity index 50%
copy from session/src/main/java/org/apache/iotdb/session/Config.java
copy to server/src/main/java/org/apache/iotdb/db/qp/physical/sys/KillQueryPlan.java
index d900df7..f6fa6ad 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/KillQueryPlan.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -16,26 +16,29 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.session;
+package org.apache.iotdb.db.qp.physical.sys;
-public class Config {
+import java.util.Collections;
+import java.util.List;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
- public static final String DEFAULT_USER = "root";
- public static final String DEFAULT_PASSWORD = "root";
- public static final int DEFAULT_FETCH_SIZE = 10000;
- public static final int DEFAULT_TIMEOUT_MS = 0;
- public static final boolean DEFAULT_CACHE_LEADER_MODE = true;
+public class KillQueryPlan extends PhysicalPlan {
- public static final int RETRY_NUM = 3;
- public static final long RETRY_INTERVAL_MS = 1000;
+ private long queryId = -1;
- /**
- * thrift init buffer size, 1KB by default
- */
- public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
+ public KillQueryPlan(long queryId) {
+ super(false, OperatorType.KILL);
+ this.queryId = queryId;
+ }
- /**
- * thrift max frame size (16384000 bytes by default), we change it to 64MB
- */
- public static final int DEFAULT_MAX_FRAME_SIZE = 67108864;
+ @Override
+ public List<PartialPath> getPaths() {
+ return Collections.emptyList();
+ }
+
+ public long getQueryId() {
+ return queryId;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
index 7a5188a..e610877 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
@@ -101,6 +101,6 @@ public class ShowPlan extends PhysicalPlan {
public enum ShowContentType {
FLUSH_TASK_INFO, TTL, VERSION, TIMESERIES, STORAGE_GROUP, CHILD_PATH, DEVICES,
COUNT_TIMESERIES, COUNT_NODE_TIMESERIES, COUNT_NODES, MERGE_STATUS, FUNCTIONS, COUNT_DEVICES,
- COUNT_STORAGE_GROUP
+ COUNT_STORAGE_GROUP, QUERY_PROCESSLIST
}
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowQueryProcesslistPlan.java
similarity index 50%
copy from session/src/main/java/org/apache/iotdb/session/Config.java
copy to server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowQueryProcesslistPlan.java
index d900df7..124445d 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowQueryProcesslistPlan.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -16,26 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.session;
+package org.apache.iotdb.db.qp.physical.sys;
-public class Config {
+public class ShowQueryProcesslistPlan extends ShowPlan {
- public static final String DEFAULT_USER = "root";
- public static final String DEFAULT_PASSWORD = "root";
- public static final int DEFAULT_FETCH_SIZE = 10000;
- public static final int DEFAULT_TIMEOUT_MS = 0;
- public static final boolean DEFAULT_CACHE_LEADER_MODE = true;
+ public ShowQueryProcesslistPlan(ShowContentType showContentType) {
+ super(showContentType);
+ }
- public static final int RETRY_NUM = 3;
- public static final long RETRY_INTERVAL_MS = 1000;
-
- /**
- * thrift init buffer size, 1KB by default
- */
- public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
-
- /**
- * thrift max frame size (16384000 bytes by default), we change it to 64MB
- */
- public static final int DEFAULT_MAX_FRAME_SIZE = 67108864;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
index d6c5db2..2910c61 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
@@ -22,6 +22,8 @@ import static org.apache.iotdb.db.index.common.IndexConstant.PATTERN;
import static org.apache.iotdb.db.index.common.IndexConstant.THRESHOLD;
import static org.apache.iotdb.db.index.common.IndexConstant.TOP_K;
import static org.apache.iotdb.db.qp.constant.SQLConstant.TIME_PATH;
+import static org.apache.iotdb.db.qp.constant.SQLConstant.TOK_KILL_QUERY;
+import static org.apache.iotdb.db.qp.constant.SQLConstant.TOK_QUERY;
import java.io.File;
import java.time.ZoneId;
@@ -68,6 +70,7 @@ import org.apache.iotdb.db.qp.logical.sys.DeleteTimeSeriesOperator;
import org.apache.iotdb.db.qp.logical.sys.DropFunctionOperator;
import org.apache.iotdb.db.qp.logical.sys.DropIndexOperator;
import org.apache.iotdb.db.qp.logical.sys.FlushOperator;
+import org.apache.iotdb.db.qp.logical.sys.KillQueryOperator;
import org.apache.iotdb.db.qp.logical.sys.LoadConfigurationOperator;
import org.apache.iotdb.db.qp.logical.sys.LoadConfigurationOperator.LoadConfigurationOperatorType;
import org.apache.iotdb.db.qp.logical.sys.LoadDataOperator;
@@ -145,6 +148,7 @@ import org.apache.iotdb.db.qp.sql.SqlBaseParser.IndexWithClauseContext;
import org.apache.iotdb.db.qp.sql.SqlBaseParser.InsertColumnsSpecContext;
import org.apache.iotdb.db.qp.sql.SqlBaseParser.InsertStatementContext;
import org.apache.iotdb.db.qp.sql.SqlBaseParser.InsertValuesSpecContext;
+import org.apache.iotdb.db.qp.sql.SqlBaseParser.KillQueryContext;
import org.apache.iotdb.db.qp.sql.SqlBaseParser.LastClauseContext;
import org.apache.iotdb.db.qp.sql.SqlBaseParser.LastElementContext;
import org.apache.iotdb.db.qp.sql.SqlBaseParser.LimitClauseContext;
@@ -189,6 +193,7 @@ import org.apache.iotdb.db.qp.sql.SqlBaseParser.ShowDevicesContext;
import org.apache.iotdb.db.qp.sql.SqlBaseParser.ShowFlushTaskInfoContext;
import org.apache.iotdb.db.qp.sql.SqlBaseParser.ShowFunctionsContext;
import org.apache.iotdb.db.qp.sql.SqlBaseParser.ShowMergeStatusContext;
+import org.apache.iotdb.db.qp.sql.SqlBaseParser.ShowQueryProcesslistContext;
import org.apache.iotdb.db.qp.sql.SqlBaseParser.ShowStorageGroupContext;
import org.apache.iotdb.db.qp.sql.SqlBaseParser.ShowTTLStatementContext;
import org.apache.iotdb.db.qp.sql.SqlBaseParser.ShowTimeseriesContext;
@@ -737,6 +742,20 @@ public class IoTDBSqlVisitor extends SqlBaseBaseVisitor<Operator> {
}
@Override
+ public Operator visitShowQueryProcesslist(ShowQueryProcesslistContext ctx) {
+ return new ShowOperator(SQLConstant.TOK_QUERY_PROCESSLIST);
+ }
+
+ @Override
+ public Operator visitKillQuery(KillQueryContext ctx) {
+ KillQueryOperator killQueryOperator = new KillQueryOperator(TOK_KILL_QUERY);
+ if (ctx.INT() != null) {
+ killQueryOperator.setQueryId(Integer.parseInt(ctx.INT().getText()));
+ }
+ return killQueryOperator;
+ }
+
+ @Override
public Operator visitShowStorageGroup(ShowStorageGroupContext ctx) {
if (ctx.prefixPath() != null) {
return new ShowStorageGroupOperator(SQLConstant.TOK_STORAGE_GROUP,
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index c906dfe..1f85a58 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -28,9 +28,9 @@ import java.util.Map;
import java.util.Set;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.PathNumOverLimitException;
import org.apache.iotdb.db.exception.query.LogicalOperatorException;
import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
+import org.apache.iotdb.db.exception.query.PathNumOverLimitException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.runtime.SQLParserException;
import org.apache.iotdb.db.metadata.PartialPath;
@@ -55,6 +55,7 @@ import org.apache.iotdb.db.qp.logical.sys.DeleteTimeSeriesOperator;
import org.apache.iotdb.db.qp.logical.sys.DropFunctionOperator;
import org.apache.iotdb.db.qp.logical.sys.DropIndexOperator;
import org.apache.iotdb.db.qp.logical.sys.FlushOperator;
+import org.apache.iotdb.db.qp.logical.sys.KillQueryOperator;
import org.apache.iotdb.db.qp.logical.sys.LoadConfigurationOperator;
import org.apache.iotdb.db.qp.logical.sys.LoadConfigurationOperator.LoadConfigurationOperatorType;
import org.apache.iotdb.db.qp.logical.sys.LoadDataOperator;
@@ -99,6 +100,7 @@ import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.DropFunctionPlan;
import org.apache.iotdb.db.qp.physical.sys.DropIndexPlan;
import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
+import org.apache.iotdb.db.qp.physical.sys.KillQueryPlan;
import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan.LoadConfigurationPlanType;
import org.apache.iotdb.db.qp.physical.sys.LoadDataPlan;
@@ -112,11 +114,12 @@ import org.apache.iotdb.db.qp.physical.sys.ShowFunctionsPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowMergeStatusPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType;
+import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
-import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.qp.physical.sys.TracingPlan;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.udf.core.context.UDFContext;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.SchemaUtils;
@@ -279,6 +282,8 @@ public class PhysicalGenerator {
case SQLConstant.TOK_CHILD_PATHS:
return new ShowChildPathsPlan(
ShowContentType.CHILD_PATH, ((ShowChildPathsOperator) operator).getPath());
+ case SQLConstant.TOK_QUERY_PROCESSLIST:
+ return new ShowQueryProcesslistPlan(ShowContentType.QUERY_PROCESSLIST);
case SQLConstant.TOK_SHOW_FUNCTIONS:
return new ShowFunctionsPlan(((ShowFunctionsOperator) operator).showTemporary());
default:
@@ -305,6 +310,8 @@ public class PhysicalGenerator {
return new DeletePartitionPlan(op.getStorageGroupName(), op.getPartitionId());
case CREATE_SCHEMA_SNAPSHOT:
return new CreateSnapshotPlan();
+ case KILL:
+ return new KillQueryPlan(((KillQueryOperator) operator).getQueryId());
case CREATE_FUNCTION:
CreateFunctionOperator createFunctionOperator = (CreateFunctionOperator) operator;
return new CreateFunctionPlan(createFunctionOperator.isTemporary(),
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index e1262ca..3344192 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -208,6 +208,9 @@ public class QueryResourceManager {
// close and delete UDF temp files
TemporaryQueryDataFileService.getInstance().deregister(queryId);
+
+ // remove query info in QueryTimeManager
+ QueryTimeManager.getInstance().unRegisterQuery(queryId);
}
private static class QueryTokenManagerHelper {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryTimeManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryTimeManager.java
new file mode 100644
index 0000000..72eed3e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryTimeManager.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.control;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.ServiceType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is used to monitor the executing time of each query.
+ * </p>
+ * Once one is over the threshold, it will be killed and return the time out exception.
+ */
+public class QueryTimeManager implements IService {
+
+ private static final Logger logger = LoggerFactory.getLogger(QueryTimeManager.class);
+
+ /**
+ * the key of queryInfoMap is the query id and the value of queryInfoMap is the start
+ * time, the statement and executing thread of this query.
+ */
+ private Map<Long, QueryInfo> queryInfoMap;
+
+ private ScheduledExecutorService executorService;
+
+ private Map<Long, ScheduledFuture<?>> queryScheduledTaskMap;
+
+ private QueryTimeManager() {
+ queryInfoMap = new ConcurrentHashMap<>();
+ queryScheduledTaskMap = new ConcurrentHashMap<>();
+ executorService = IoTDBThreadPoolFactory.newScheduledThreadPool(1,
+ "query-time-manager");
+ }
+
+ public void registerQuery(long queryId, long startTime, String sql, long timeout,
+ Thread queryThread) {
+ queryInfoMap.put(queryId, new QueryInfo(startTime, sql, queryThread));
+ // submit a scheduled task to judge whether query is still running after timeout
+ ScheduledFuture<?> scheduledFuture = executorService.schedule(() -> {
+ queryInfoMap.computeIfPresent(queryId, (k, v) -> {
+ killQuery(k);
+ logger.warn(String.format("Query is time out with queryId %d", queryId));
+ return null;
+ });
+ }, timeout, TimeUnit.MILLISECONDS);
+ queryScheduledTaskMap.put(queryId, scheduledFuture);
+ }
+
+ public void killQuery(long queryId) {
+ if (queryInfoMap.get(queryId) == null) {
+ return;
+ }
+ queryInfoMap.get(queryId).getThread().interrupt();
+ unRegisterQuery(queryId);
+ }
+
+ public void unRegisterQuery(long queryId) {
+ if (Thread.interrupted()) {
+ throw new QueryTimeoutRuntimeException(
+ QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
+ }
+ if (queryInfoMap.get(queryId) == null) {
+ return;
+ }
+ queryInfoMap.remove(queryId);
+ queryScheduledTaskMap.get(queryId).cancel(false);
+ queryScheduledTaskMap.remove(queryId);
+ }
+
+ public boolean isQueryInterrupted(long queryId) {
+ return queryInfoMap.get(queryId).getThread().isInterrupted();
+ }
+
+ public Map<Long, QueryInfo> getQueryInfoMap() {
+ return queryInfoMap;
+ }
+
+ public static QueryTimeManager getInstance() {
+ return QueryTimeManagerHelper.INSTANCE;
+ }
+
+ @Override
+ public void start() {
+ // Do Nothing
+ }
+
+ @Override
+ public void stop() {
+ if (executorService == null || executorService.isShutdown()) {
+ return;
+ }
+ executorService.shutdownNow();
+ }
+
+ @Override
+ public ServiceType getID() {
+ return ServiceType.QUERY_TIME_MANAGER;
+ }
+
+ private static class QueryTimeManagerHelper {
+
+ private static final QueryTimeManager INSTANCE = new QueryTimeManager();
+
+ private QueryTimeManagerHelper() {
+ }
+ }
+
+ public class QueryInfo {
+
+ /**
+ * To reduce the cost of memory, we only keep the a certain size statement.
+ * For statement whose length is over this, we keep its head and tail.
+ */
+ private static final int MAX_STATEMENT_LENGTH = 64;
+
+ private final long startTime;
+ private final String statement;
+ /**
+ * Only main thread is put in this structure since the sub threads are maintained
+ * by the thread pool. The thread allocated for readTask will change every time,
+ * so we have to access this map frequently, which will lead to big performance cost.
+ */
+ private final Thread thread;
+
+ public QueryInfo(long startTime, String statement, Thread thread) {
+ this.startTime = startTime;
+ this.thread = thread;
+ if (statement.length() <= 64) {
+ this.statement = statement;
+ } else {
+ this.statement = statement.substring(0, MAX_STATEMENT_LENGTH / 2) + "..." + statement
+ .substring(statement.length() - MAX_STATEMENT_LENGTH / 2);
+ }
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public String getStatement() {
+ return statement;
+ }
+
+ public Thread getThread() {
+ return thread;
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
index c286fa3..16e58de 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
@@ -28,6 +28,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicIntegerArray;
import org.apache.iotdb.db.concurrent.WrappedRunnable;
+import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
@@ -68,6 +69,10 @@ public class NonAlignEngineDataSet extends QueryDataSet implements DirectNonAlig
PublicBAOS timeBAOS = new PublicBAOS();
PublicBAOS valueBAOS = new PublicBAOS();
try {
+ if (interrupted) {
+ // nothing is put here since before reading it the main thread will return
+ return;
+ }
synchronized (reader) {
// if the task is submitted, there must be free space in the queue
// so here we don't need to check whether the queue has free space
@@ -189,6 +194,7 @@ public class NonAlignEngineDataSet extends QueryDataSet implements DirectNonAlig
}
} catch (InterruptedException e) {
LOGGER.error("Interrupted while putting into the blocking queue: ", e);
+ interrupted = true;
Thread.currentThread().interrupt();
} catch (IOException e) {
LOGGER.error("Something gets wrong while reading from the series reader: ", e);
@@ -226,6 +232,11 @@ public class NonAlignEngineDataSet extends QueryDataSet implements DirectNonAlig
// capacity for blocking queue
private static final int BLOCKING_QUEUE_CAPACITY = 5;
+ /**
+ * flag that main thread is interrupted or not
+ */
+ private volatile boolean interrupted = false;
+
private static final QueryTaskPoolManager pool = QueryTaskPoolManager.getInstance();
private static final Logger LOGGER = LoggerFactory.getLogger(NonAlignEngineDataSet.class);
@@ -258,6 +269,11 @@ public class NonAlignEngineDataSet extends QueryDataSet implements DirectNonAlig
}
private void init(WatermarkEncoder encoder, int fetchSize) {
+ if (Thread.interrupted()) {
+ interrupted = true;
+ throw new QueryTimeoutRuntimeException(
+ QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
+ }
initLimit(super.rowOffset, super.rowLimit, seriesReaderWithoutValueFilterList.size());
this.fetchSize = fetchSize;
for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) {
@@ -286,6 +302,12 @@ public class NonAlignEngineDataSet extends QueryDataSet implements DirectNonAlig
for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
if (!noMoreDataInQueueArray[seriesIndex]) {
+ // check the interrupted status of main thread before take next batch
+ if (Thread.interrupted()) {
+ interrupted = true;
+ throw new QueryTimeoutRuntimeException(
+ QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
+ }
Pair<ByteBuffer, ByteBuffer> timeValueByteBufferPair = blockingQueueArray[seriesIndex]
.take();
if (timeValueByteBufferPair.left == null || timeValueByteBufferPair.right == null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index 0dccfe2..c1ebc7e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -28,6 +28,7 @@ import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.iotdb.db.concurrent.WrappedRunnable;
+import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
@@ -49,14 +50,14 @@ import org.slf4j.LoggerFactory;
public class RawQueryDataSetWithoutValueFilter extends QueryDataSet implements
DirectAlignByTimeDataSet {
- private static class ReadTask extends WrappedRunnable {
+ private class ReadTask extends WrappedRunnable {
private final ManagedSeriesReader reader;
private final String pathName;
private BlockingQueue<BatchData> blockingQueue;
- public ReadTask(ManagedSeriesReader reader,
- BlockingQueue<BatchData> blockingQueue, String pathName) {
+ public ReadTask(ManagedSeriesReader reader, BlockingQueue<BatchData> blockingQueue,
+ String pathName) {
this.reader = reader;
this.blockingQueue = blockingQueue;
this.pathName = pathName;
@@ -65,6 +66,10 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet implements
@Override
public void runMayThrow() {
try {
+ // check the status of mainThread before next reading
+ if (interrupted) {
+ return;
+ }
synchronized (reader) {
// if the task is submitted, there must be free space in the queue
// so here we don't need to check whether the queue has free space
@@ -141,6 +146,11 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet implements
// capacity for blocking queue
private static final int BLOCKING_QUEUE_CAPACITY = 5;
+ /**
+ * flag that main thread is interrupted or not
+ */
+ private volatile boolean interrupted = false;
+
private static final QueryTaskPoolManager TASK_POOL_MANAGER = QueryTaskPoolManager.getInstance();
private static final Logger LOGGER = LoggerFactory
@@ -178,6 +188,12 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet implements
.submit(new ReadTask(reader, blockingQueueArray[i], paths.get(i).getFullPath()));
}
for (int i = 0; i < seriesReaderList.size(); i++) {
+ // check the interrupted status of main thread before taking next batch
+ if (Thread.interrupted()) {
+ interrupted = true;
+ throw new QueryTimeoutRuntimeException(
+ QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
+ }
fillCache(i);
// try to put the next timestamp into the heap
if (cachedBatchDataArray[i] != null && cachedBatchDataArray[i].hasCurrent()) {
@@ -283,7 +299,13 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet implements
// move next
cachedBatchDataArray[seriesIndex].next();
- // get next batch if current batch is empty and still have remaining batch data in queue
+ // check the interrupted status of main thread before taking next batch
+ if (Thread.interrupted()) {
+ interrupted = true;
+ throw new QueryTimeoutRuntimeException(
+ QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
+ }
+ // get next batch if current batch is empty and still have remaining batch data in queue
if (!cachedBatchDataArray[seriesIndex].hasCurrent()
&& !noMoreDataInQueueArray[seriesIndex]) {
fillCache(seriesIndex);
@@ -430,6 +452,12 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet implements
// move next
cachedBatchDataArray[seriesIndex].next();
+ // check the interrupted status of main thread before taking next batch
+ if (Thread.interrupted()) {
+ interrupted = true;
+ throw new QueryTimeoutRuntimeException(
+ QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
+ }
// get next batch if current batch is empty and still have remaining batch data in queue
if (!cachedBatchDataArray[seriesIndex].hasCurrent()
&& !noMoreDataInQueueArray[seriesIndex]) {
@@ -437,7 +465,10 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet implements
fillCache(seriesIndex);
} catch (InterruptedException e) {
LOGGER.error("Interrupted while taking from the blocking queue: ", e);
+ interrupted = true;
Thread.currentThread().interrupt();
+ throw new QueryTimeoutRuntimeException(
+ QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
} catch (IOException e) {
LOGGER.error("Got IOException", e);
throw e;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
index a4cb4f9..ea12e37 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
@@ -70,7 +71,8 @@ public class RawDataQueryExecutor {
queryPlan.getDeduplicatedDataTypes(), readersOfSelectedSeries, queryPlan.isAscending());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- throw new StorageEngineException(e.getMessage());
+ throw new QueryTimeoutRuntimeException(
+ QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
} catch (IOException e) {
throw new StorageEngineException(e.getMessage());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index b433bae..0d15e76 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
@@ -173,6 +174,10 @@ public class SeriesReader {
}
boolean hasNextFile() throws IOException {
+ if (Thread.interrupted()) {
+ throw new QueryTimeoutRuntimeException(
+ QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
+ }
if (!unSeqPageReaders.isEmpty()
|| firstPageReader != null
@@ -232,6 +237,11 @@ public class SeriesReader {
* overlapped chunks are consumed
*/
boolean hasNextChunk() throws IOException {
+ if (Thread.interrupted()) {
+ throw new QueryTimeoutRuntimeException(
+ QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
+ }
+
if (!unSeqPageReaders.isEmpty()
|| firstPageReader != null
|| mergeReader.hasNextTimeValuePair()) {
@@ -351,6 +361,10 @@ public class SeriesReader {
@SuppressWarnings("squid:S3776")
// Suppress high Cognitive Complexity warning
boolean hasNextPage() throws IOException {
+ if (Thread.interrupted()) {
+ throw new QueryTimeoutRuntimeException(
+ QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
+ }
/*
* has overlapped data before
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index 835201a..7757dd1 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -45,6 +45,7 @@ public enum ServiceType {
TEMPORARY_QUERY_DATA_FILE_SERVICE("Temporary Query Data File Service", ""),
CACHE_HIT_RATIO_DISPLAY_SERVICE("CACHE_HIT_RATIO_DISPLAY_SERVICE",
generateJmxName(IoTDBConstant.IOTDB_PACKAGE, "Cache Hit Ratio")),
+ QUERY_TIME_MANAGER("Query time manager", "Query time"),
FLUSH_SERVICE("Flush ServerService",
generateJmxName("org.apache.iotdb.db.engine.pool", "Flush Manager")),
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 0ad6020..261d0f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -59,6 +59,7 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
import org.apache.iotdb.db.exception.runtime.SQLParserException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metrics.server.SqlArgument;
@@ -88,9 +89,11 @@ import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.control.QueryTimeManager;
import org.apache.iotdb.db.query.control.TracingManager;
import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
@@ -200,6 +203,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
private static final AtomicInteger queryCount = new AtomicInteger(0);
+ private QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
+
public TSServiceImpl() throws QueryProcessException {
processor = new Planner();
executor = new PlanExecutor();
@@ -470,7 +475,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return physicalPlan.isQuery()
? internalExecuteQueryStatement(statement, req.statementId, physicalPlan,
- req.fetchSize, sessionIdUsernameMap.get(req.getSessionId()))
+ req.fetchSize, req.timeout, sessionIdUsernameMap.get(req.getSessionId()))
: executeUpdateStatement(physicalPlan, req.getSessionId());
} catch (Exception e) {
return RpcUtils.getTSExecuteStatementResp(onQueryException(e, "executing executeStatement"));
@@ -491,7 +496,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return physicalPlan.isQuery()
? internalExecuteQueryStatement(statement, req.statementId, physicalPlan, req.fetchSize,
- sessionIdUsernameMap.get(req.getSessionId()))
+ req.timeout, sessionIdUsernameMap.get(req.getSessionId()))
: RpcUtils.getTSExecuteStatementResp(
TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
} catch (Exception e) {
@@ -511,7 +516,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
.rawDataQueryReqToPhysicalPlan(req, sessionIdZoneIdMap.get(req.getSessionId()));
return physicalPlan.isQuery()
? internalExecuteQueryStatement("", req.statementId, physicalPlan, req.fetchSize,
- sessionIdUsernameMap.get(req.getSessionId()))
+ config.getQueryTimeThreshold(), sessionIdUsernameMap.get(req.getSessionId()))
: RpcUtils.getTSExecuteStatementResp(TSStatusCode.EXECUTE_STATEMENT_ERROR,
"Statement is not a query statement.");
} catch (Exception e) {
@@ -525,8 +530,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
* some AuthorPlan
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
- private TSExecuteStatementResp internalExecuteQueryStatement(String statement,
- long statementId, PhysicalPlan plan, int fetchSize, String username)
+ private TSExecuteStatementResp internalExecuteQueryStatement(String statement, long statementId,
+ PhysicalPlan plan, int fetchSize, long timeout, String username)
throws QueryProcessException, SQLException, StorageEngineException, QueryFilterOptimizationException, MetadataException, IOException, InterruptedException, TException, AuthException {
queryCount.incrementAndGet();
AUDIT_LOGGER.debug("Session {} execute Query: {}", currSessionId.get(), statement);
@@ -579,12 +584,17 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
// generate the queryId for the operation
queryId = generateQueryId(true, fetchSize, deduplicatedPathNum);
+ // register query info to queryTimeManager
+ if (!(plan instanceof ShowQueryProcesslistPlan)) {
+ queryTimeManager
+ .registerQuery(queryId, startTime, statement, timeout, Thread.currentThread());
+ }
if (plan instanceof QueryPlan && config.isEnablePerformanceTracing()) {
+ TracingManager tracingManager = TracingManager.getInstance();
if (!(plan instanceof AlignByDevicePlan)) {
- TracingManager.getInstance()
- .writeQueryInfo(queryId, statement, startTime, plan.getPaths().size());
+ tracingManager.writeQueryInfo(queryId, statement, startTime, plan.getPaths().size());
} else {
- TracingManager.getInstance().writeQueryInfo(queryId, statement, startTime);
+ tracingManager.writeQueryInfo(queryId, statement, startTime);
}
}
@@ -611,7 +621,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
resp.setOperationType(plan.getOperatorType().toString());
if (plan.getOperatorType() == OperatorType.AGGREGATION) {
resp.setIgnoreTimeStamp(true);
- } // else default ignoreTimeStamp is false
+ } else if (plan instanceof ShowQueryProcesslistPlan) {
+ resp.setIgnoreTimeStamp(false);
+ }
if (newDataSet instanceof DirectNonAlignDataSet) {
resp.setNonAlignQueryDataSet(fillRpcNonAlignReturnData(fetchSize, newDataSet, username));
@@ -636,6 +648,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
}
+ // remove query info in QueryTimeManager
+ if (!(plan instanceof ShowQueryProcesslistPlan)) {
+ queryTimeManager.unRegisterQuery(queryId);
+ }
return resp;
} catch (Exception e) {
releaseQueryResourceNoExceptions(queryId);
@@ -837,6 +853,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "Has not executed query"));
}
+ // register query info to queryTimeManager
+ queryTimeManager
+ .registerQuery(req.queryId, System.currentTimeMillis(), req.statement, req.timeout,
+ Thread.currentThread());
+
QueryDataSet queryDataSet = queryId2DataSet.get(req.queryId);
if (req.isAlign) {
TSQueryDataSet result =
@@ -849,6 +870,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
resp.setHasResultSet(hasResultSet);
resp.setQueryDataSet(result);
resp.setIsAlign(true);
+
+ queryTimeManager.unRegisterQuery(req.queryId);
return resp;
} else {
TSQueryNonAlignDataSet nonAlignResult =
@@ -868,6 +891,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
resp.setHasResultSet(hasResultSet);
resp.setNonAlignQueryDataSet(nonAlignResult);
resp.setIsAlign(false);
+
+ queryTimeManager.unRegisterQuery(req.queryId);
return resp;
}
} catch (Exception e) {
@@ -888,9 +913,15 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
private TSQueryNonAlignDataSet fillRpcNonAlignReturnData(
int fetchSize, QueryDataSet queryDataSet, String userName)
- throws TException, AuthException, InterruptedException, IOException, QueryProcessException {
+ throws TException, AuthException, IOException, QueryProcessException {
WatermarkEncoder encoder = getWatermarkEncoder(userName);
- return ((DirectNonAlignDataSet) queryDataSet).fillBuffer(fetchSize, encoder);
+ try {
+ return ((DirectNonAlignDataSet) queryDataSet).fillBuffer(fetchSize, encoder);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new QueryTimeoutRuntimeException(
+ QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
+ }
}
private WatermarkEncoder getWatermarkEncoder(String userName) throws TException, AuthException {
@@ -1573,7 +1604,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
private TSStatus tryCatchQueryException(Exception e) {
- if (e instanceof ParseCancellationException) {
+ if (e instanceof QueryTimeoutRuntimeException) {
+ DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(e.getMessage(), e);
+ // just recover the state of thread here
+ if (Thread.interrupted()) {
+ DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn("Recover the state of the thread interrupted");
+ }
+ return RpcUtils.getStatus(TSStatusCode.TIME_OUT, getRootCause(e));
+ } else if (e instanceof ParseCancellationException) {
DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(INFO_PARSING_SQL_ERROR, e);
return RpcUtils
.getStatus(TSStatusCode.SQL_PARSE_ERROR, INFO_PARSING_SQL_ERROR + getRootCause(e));
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByIT.java
index 8be642c..c588f94 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByIT.java
@@ -41,7 +41,6 @@ import static org.junit.Assert.fail;
public class IOTDBGroupByIT {
-
private static String[] dataSet1 = new String[]{
"SET STORAGE GROUP TO root.ln.wf01.wt01",
"CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBKillQueryTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBKillQueryTest.java
new file mode 100644
index 0000000..86cb6ac
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBKillQueryTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.jdbc.IoTDBSQLException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IoTDBKillQueryTest {
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.envSetUp();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ }
+
+ /**
+ * Test killing query with specified query id which is not exist.
+ */
+ @Test
+ public void killQueryTest1() {
+ try (Connection connection = DriverManager.
+ getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ statement.execute("kill query 998");
+ fail("QueryIdNotExistException is not thrown");
+ } catch (IoTDBSQLException e) {
+ Assert.assertTrue(e.getMessage().contains("Query Id 998 is not exist"));
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * Test killing query without explicit query id. It's supposed to run successfully.
+ */
+ @Test
+ public void killQueryTest2() {
+ try (Connection connection = DriverManager.
+ getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet = statement.execute("kill query");
+ Assert.assertEquals(false, hasResultSet);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryTimeoutTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryTimeoutTest.java
new file mode 100644
index 0000000..6a2bb99
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryTimeoutTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.Statement;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.jdbc.IoTDBSQLException;
+import org.apache.iotdb.jdbc.IoTDBStatement;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class IoTDBQueryTimeoutTest {
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.envSetUp();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ prepareData();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ }
+
+ /**
+ * Test show query processlist, there is supposed to no result.
+ */
+ @Test
+ public void queryProcessListTest() {
+ String headerResult = "Time, queryId, statement, ";
+
+ try (Connection connection = DriverManager.
+ getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet = statement.execute("show query processlist");
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ Assert.assertEquals(3, metaData.getColumnCount());
+ StringBuilder headerBuilder = new StringBuilder();
+ for (int i = 1; i <= metaData.getColumnCount(); i++) {
+ headerBuilder.append(metaData.getColumnName(i)).append(", ");
+ }
+ Assert.assertEquals(headerResult, headerBuilder.toString());
+
+ int cnt = 0;
+ while (resultSet.next()) {
+ cnt++;
+ }
+ Assert.assertEquals(0, cnt);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * Test query with timeout, which is supposed to throw an QueryTimeoutRuntimeException.
+ * Note: This test is not guaranteed to time out.
+ */
+ @Test
+ public void queryWithTimeoutTest() {
+ try (Connection connection = DriverManager.
+ getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ statement.setFetchSize(20000);
+ try {
+ ((IoTDBStatement) statement)
+ .executeQuery("select count(*) from root group by ([1, 40000), 2ms)", 1);
+ fail("QueryTimeoutRuntimeException is not thrown");
+ } catch (IoTDBSQLException e) {
+ Assert.assertTrue(e.getMessage().contains("Current query is time out"));
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * Test executing query after a timeout query, it's supposed to execute correctly.
+ */
+ @Test
+ public void queryAfterTimeoutQueryTest() {
+ try (Connection connection = DriverManager.
+ getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ statement.setFetchSize(20000);
+ try {
+ ((IoTDBStatement) statement)
+ .executeQuery("select count(*) from root group by ([1, 20000), 2ms)", 1);
+ } catch (IoTDBSQLException e) {
+ Assert.assertTrue(e.getMessage().contains("Current query is time out"));
+ }
+
+ Boolean hasResultSet = statement.execute("select max_time(s1) from root.sg.d1");
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ Assert.assertEquals(40000, resultSet.getLong("max_time(root.sg.d1.s1)"));
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static void prepareData() {
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ for (int i = 0; i <= 40000; i++) {
+ statement.execute(String.format("insert into root.sg1.d1(time,s1) values(%d,%d)", i, i));
+ statement.execute(String.format("insert into root.sg2.d2(time,s2) values(%d,%d)", i, i));
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
index 86757ba..8ed69c2 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
@@ -53,6 +53,7 @@ public class IoTDBRpcDataSet {
public Map<String, Integer> columnOrdinalMap; // used because the server returns deduplicated columns
public List<TSDataType> columnTypeDeduplicatedList; // deduplicated from columnTypeList
public int fetchSize;
+ public final long timeout;
public boolean emptyResultSet = false;
public boolean hasCachedRecord = false;
public boolean lastReadWasNull;
@@ -77,13 +78,14 @@ public class IoTDBRpcDataSet {
public IoTDBRpcDataSet(String sql, List<String> columnNameList, List<String> columnTypeList,
Map<String, Integer> columnNameIndex, boolean ignoreTimeStamp,
long queryId, TSIService.Iface client, long sessionId, TSQueryDataSet queryDataSet,
- int fetchSize) {
+ int fetchSize, long timeout) {
this.sessionId = sessionId;
this.ignoreTimeStamp = ignoreTimeStamp;
this.sql = sql;
this.queryId = queryId;
this.client = client;
this.fetchSize = fetchSize;
+ this.timeout = timeout;
columnSize = columnNameList.size();
this.columnNameList = new ArrayList<>();
@@ -200,7 +202,8 @@ public class IoTDBRpcDataSet {
public boolean fetchResults() throws StatementExecutionException, IoTDBConnectionException {
rowsIndex = 0;
- TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId, true);
+ TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId, true,
+ timeout);
try {
TSFetchResultsResp resp = client.fetchResults(req);
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index b839c53..a6f1ee5 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -60,6 +60,7 @@ public enum TSStatusCode {
QUERY_PROCESS_ERROR(411),
WRITE_PROCESS_ERROR(412),
WRITE_PROCESS_REJECT(413),
+ QUERY_ID_NOT_EXIST(414),
UNSUPPORTED_INDEX_FUNC_ERROR(421),
UNSUPPORTED_INDEX_TYPE_ERROR(422),
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java b/session/src/main/java/org/apache/iotdb/session/Config.java
index d900df7..c879a2d 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/session/src/main/java/org/apache/iotdb/session/Config.java
@@ -23,7 +23,8 @@ public class Config {
public static final String DEFAULT_USER = "root";
public static final String DEFAULT_PASSWORD = "root";
public static final int DEFAULT_FETCH_SIZE = 10000;
- public static final int DEFAULT_TIMEOUT_MS = 0;
+ public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 0;
+ public static final int DEFAULT_QUERY_TIMEOUT_MS = 60000;
public static final boolean DEFAULT_CACHE_LEADER_MODE = true;
public static final int RETRY_NUM = 3;
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index a27dd31..95b0242 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -135,11 +135,11 @@ public class Session {
}
public synchronized void open() throws IoTDBConnectionException {
- open(false, Config.DEFAULT_TIMEOUT_MS);
+ open(false, Config.DEFAULT_CONNECTION_TIMEOUT_MS);
}
public synchronized void open(boolean enableRPCCompression) throws IoTDBConnectionException {
- open(enableRPCCompression, Config.DEFAULT_TIMEOUT_MS);
+ open(enableRPCCompression, Config.DEFAULT_CONNECTION_TIMEOUT_MS);
}
private synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
@@ -296,7 +296,7 @@ public class Session {
}
/**
- * execure query sql
+ * execute query sql
*
* @param sql query statement
* @return result set
@@ -307,6 +307,21 @@ public class Session {
}
/**
+ * execute query sql with explicit timeout
+ *
+ * @param sql query statement
+ * @param timeout the timeout of this query, in milliseconds
+ * @return result set
+ */
+ public SessionDataSet executeQueryStatement(String sql, long timeout)
+ throws StatementExecutionException, IoTDBConnectionException {
+ if (timeout <= 0) {
+ throw new StatementExecutionException("Timeout must be over 0, please check and try again.");
+ }
+ return defaultSessionConnection.executeQueryStatement(sql, timeout);
+ }
+
+ /**
* execute non query statement
*
* @param sql non query statement
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 5ad0b93..8dacf21 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -258,8 +258,14 @@ public class SessionConnection {
protected SessionDataSet executeQueryStatement(String sql)
throws StatementExecutionException, IoTDBConnectionException {
+ return this.executeQueryStatement(sql, Config.DEFAULT_QUERY_TIMEOUT_MS);
+ }
+
+ protected SessionDataSet executeQueryStatement(String sql, long timeout)
+ throws StatementExecutionException, IoTDBConnectionException {
TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, statementId);
execReq.setFetchSize(session.fetchSize);
+ execReq.setTimeout(timeout);
TSExecuteStatementResp execResp;
try {
execResp = client.executeQueryStatement(execReq);
@@ -282,9 +288,10 @@ public class SessionConnection {
return new SessionDataSet(sql, execResp.getColumns(), execResp.getDataTypeList(),
execResp.columnNameIndexMap,
execResp.getQueryId(), client, sessionId, execResp.queryDataSet,
- execResp.isIgnoreTimeStamp());
+ execResp.isIgnoreTimeStamp(), timeout);
}
+
protected void executeNonQueryStatement(String sql)
throws IoTDBConnectionException, StatementExecutionException {
TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, statementId);
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
index dbe078d..5caadba 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
@@ -46,7 +46,17 @@ public class SessionDataSet {
long queryId, TSIService.Iface client, long sessionId, TSQueryDataSet queryDataSet,
boolean ignoreTimeStamp) {
this.ioTDBRpcDataSet = new IoTDBRpcDataSet(sql, columnNameList, columnTypeList, columnNameIndex,
- ignoreTimeStamp, queryId, client, sessionId, queryDataSet, Config.DEFAULT_FETCH_SIZE);
+ ignoreTimeStamp, queryId, client, sessionId, queryDataSet, Config.DEFAULT_FETCH_SIZE,
+ Config.DEFAULT_QUERY_TIMEOUT_MS);
+ }
+
+ public SessionDataSet(String sql, List<String> columnNameList, List<String> columnTypeList,
+ Map<String, Integer> columnNameIndex,
+ long queryId, TSIService.Iface client, long sessionId, TSQueryDataSet queryDataSet,
+ boolean ignoreTimeStamp, long timeout) {
+ this.ioTDBRpcDataSet = new IoTDBRpcDataSet(sql, columnNameList, columnTypeList, columnNameIndex,
+ ignoreTimeStamp, queryId, client, sessionId, queryDataSet, Config.DEFAULT_FETCH_SIZE,
+ timeout);
}
public int getFetchSize() {
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java
index 439be74..b237437 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java
@@ -242,6 +242,35 @@ public class IoTDBSessionIteratorIT {
}
}
+ /**
+ * Test executeQueryStatement with timeout, and the result is not timeout here.
+ */
+ @Test
+ public void queryWithTimeoutTest() {
+ String[] retArray = new String[]{
+ "9,root.sg1.d1.s1,false"
+ };
+
+ try {
+ SessionDataSet sessionDataSet = session
+ .executeQueryStatement("select last s1 from root.sg1.d1", 2000);
+ sessionDataSet.setFetchSize(1024);
+ DataIterator iterator = sessionDataSet.iterator();
+ int count = 0;
+ while (iterator.next()) {
+ String ans = String.format("%s,%s,%s", iterator.getLong(1), iterator.getString(2),
+ iterator.getString(3));
+ assertEquals(retArray[count], ans);
+ count++;
+ }
+ assertEquals(retArray.length, count);
+ sessionDataSet.closeOperationHandle();
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
private void prepareData() throws IoTDBConnectionException, StatementExecutionException {
session = new Session("127.0.0.1", 6667, "root", "root");
session.open();
diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift
index 4f30d06..a1a7ea5 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -115,6 +115,8 @@ struct TSExecuteStatementReq {
3: required i64 statementId
4: optional i32 fetchSize
+
+ 5: optional i64 timeout
}
struct TSExecuteBatchStatementReq{
@@ -153,6 +155,7 @@ struct TSFetchResultsReq{
3: required i32 fetchSize
4: required i64 queryId
5: required bool isAlign
+ 6: required i64 timeout
}
struct TSFetchResultsResp{
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/QueryTimeoutRuntimeException.java
similarity index 50%
copy from session/src/main/java/org/apache/iotdb/session/Config.java
copy to tsfile/src/main/java/org/apache/iotdb/tsfile/exception/QueryTimeoutRuntimeException.java
index d900df7..1f062c5 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/QueryTimeoutRuntimeException.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -16,26 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.session;
+package org.apache.iotdb.tsfile.exception;
-public class Config {
+/**
+ * This class is used to throw run time exception when query is time out.
+ */
+public class QueryTimeoutRuntimeException extends RuntimeException {
- public static final String DEFAULT_USER = "root";
- public static final String DEFAULT_PASSWORD = "root";
- public static final int DEFAULT_FETCH_SIZE = 10000;
- public static final int DEFAULT_TIMEOUT_MS = 0;
- public static final boolean DEFAULT_CACHE_LEADER_MODE = true;
+ public static final String TIMEOUT_EXCEPTION_MESSAGE
+ = "Current query is time out, please check your statement or modify timeout parameter.";
- public static final int RETRY_NUM = 3;
- public static final long RETRY_INTERVAL_MS = 1000;
+ public QueryTimeoutRuntimeException(String message, Throwable cause) {
+ super(message, cause);
+ }
- /**
- * thrift init buffer size, 1KB by default
- */
- public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
+ public QueryTimeoutRuntimeException(String message) {
+ super(message);
+ }
- /**
- * thrift max frame size (16384000 bytes by default), we change it to 64MB
- */
- public static final int DEFAULT_MAX_FRAME_SIZE = 67108864;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
index de314b1..0f4f531 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
@@ -22,9 +22,11 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
+import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
+import org.apache.iotdb.tsfile.exception.QueryTimeoutRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +46,9 @@ public class LocalTsFileInput implements TsFileInput {
public long size() throws IOException {
try {
return channel.size();
+ } catch (ClosedByInterruptException e) {
+ throw new QueryTimeoutRuntimeException(
+ QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
} catch (IOException e) {
logger.error("Error happened while getting {} size", filePath);
throw e;
@@ -54,6 +59,9 @@ public class LocalTsFileInput implements TsFileInput {
public long position() throws IOException {
try {
return channel.position();
+ } catch (ClosedByInterruptException e) {
+ throw new QueryTimeoutRuntimeException(
+ QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
} catch (IOException e) {
logger.error("Error happened while getting {} current position", filePath);
throw e;
@@ -65,6 +73,9 @@ public class LocalTsFileInput implements TsFileInput {
try {
channel.position(newPosition);
return this;
+ } catch (ClosedByInterruptException e) {
+ throw new QueryTimeoutRuntimeException(
+ QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
} catch (IOException e) {
logger.error("Error happened while changing {} position to {}", filePath, newPosition);
throw e;
@@ -75,6 +86,9 @@ public class LocalTsFileInput implements TsFileInput {
public int read(ByteBuffer dst) throws IOException {
try {
return channel.read(dst);
+ } catch (ClosedByInterruptException e) {
+ throw new QueryTimeoutRuntimeException(
+ QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
} catch (IOException e) {
logger.error("Error happened while reading {} from current position", filePath);
throw e;
@@ -85,6 +99,9 @@ public class LocalTsFileInput implements TsFileInput {
public int read(ByteBuffer dst, long position) throws IOException {
try {
return channel.read(dst, position);
+ } catch (ClosedByInterruptException e) {
+ throw new QueryTimeoutRuntimeException(
+ QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
} catch (IOException e) {
logger.error("Error happened while reading {} from position {}", filePath, position);
throw e;
@@ -115,6 +132,9 @@ public class LocalTsFileInput implements TsFileInput {
public void close() throws IOException {
try {
channel.close();
+ } catch (ClosedByInterruptException e) {
+ throw new QueryTimeoutRuntimeException(
+ QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
} catch (IOException e) {
logger.error("Error happened while closing {}", filePath);
throw e;