You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/01/06 01:48:31 UTC
[iotdb] branch master updated: [IOTDB-4817] Support kill query in cluster
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6e98062c1f [IOTDB-4817] Support kill query in cluster
6e98062c1f is described below
commit 6e98062c1fcdc95534f195d4d9b3a9e0647d3ad2
Author: Weihao Li <60...@users.noreply.github.com>
AuthorDate: Fri Jan 6 09:48:26 2023 +0800
[IOTDB-4817] Support kill query in cluster
---
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 2 +-
.../confignode/client/DataNodeRequestType.java | 5 +-
.../client/async/AsyncDataNodeClientPool.java | 6 ++
.../client/async/handlers/AsyncClientHandler.java | 1 +
.../client/sync/SyncDataNodeClientPool.java | 2 +
.../iotdb/confignode/manager/ConfigManager.java | 8 +++
.../apache/iotdb/confignode/manager/IManager.java | 2 +
.../iotdb/confignode/manager/node/NodeManager.java | 38 +++++++++++++
.../thrift/ConfigNodeRPCServiceProcessor.java | 5 ++
.../Maintenance-Tools/Maintenance-Command.md | 14 ++++-
docs/UserGuide/Reference/Status-Codes.md | 2 +
.../Maintenance-Tools/Maintenance-Command.md | 16 +++++-
docs/zh/UserGuide/Reference/Status-Codes.md | 2 +
.../apache/iotdb/db/client/ConfigNodeClient.java | 21 +++++++
.../query/KilledByOthersException.java} | 47 +++-------------
.../iotdb/db/mpp/execution/QueryIdGenerator.java | 2 +-
.../iotdb/db/mpp/execution/QueryStateMachine.java | 9 +++
.../db/mpp/plan/execution/IQueryExecution.java | 2 +
.../db/mpp/plan/execution/QueryExecution.java | 9 +++
.../mpp/plan/execution/config/ConfigExecution.java | 5 ++
.../plan/execution/config/ConfigTaskVisitor.java | 7 +++
.../config/executor/ClusterConfigTaskExecutor.java | 34 ++++++++++++
.../config/executor/IConfigTaskExecutor.java | 3 +
.../plan/execution/config/sys/KillQueryTask.java | 42 ++++++++++++++
.../iotdb/db/mpp/plan/parser/ASTVisitor.java | 10 ++++
.../db/mpp/plan/statement/StatementVisitor.java | 5 ++
.../mpp/plan/statement/sys/KillQueryStatement.java | 64 ++++++++++++++++++++++
.../impl/DataNodeInternalRPCServiceImpl.java | 19 +++++++
.../execution/operator/MergeSortOperatorTest.java | 3 +
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 2 +
.../src/main/thrift/confignode.thrift | 3 +
thrift/src/main/thrift/datanode.thrift | 2 +
32 files changed, 346 insertions(+), 46 deletions(-)
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index ba9b4d6d8f..0d6a2cc46f 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -741,7 +741,7 @@ showQueries
// Kill Query
killQuery
- : KILL QUERY INTEGER_LITERAL?
+ : KILL (QUERY queryId=STRING_LITERAL | ALL QUERIES)
;
// Grant Watermark Embedding
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index ff236ce339..335a818574 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -85,5 +85,8 @@ public enum DataNodeRequestType {
CONSTRUCT_SCHEMA_BLACK_LIST_WITH_TEMPLATE,
ROLLBACK_SCHEMA_BLACK_LIST_WITH_TEMPLATE,
DEACTIVATE_TEMPLATE,
- COUNT_PATHS_USING_TEMPLATE
+ COUNT_PATHS_USING_TEMPLATE,
+
+ /** @TODO Need to migrate to 'Node Maintenance' */
+ KILL_QUERY_INSTANCE
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index 7f0404cc69..097753ccbd 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -324,6 +324,12 @@ public class AsyncDataNodeClientPool {
(CountPathsUsingTemplateRPCHandler)
clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
break;
+ case KILL_QUERY_INSTANCE:
+ client.killQueryInstance(
+ (String) clientHandler.getRequest(requestId),
+ (AsyncTSStatusRPCHandler)
+ clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+ break;
default:
LOGGER.error(
"Unexpected DataNode Request Type: {} when sendAsyncRequestToDataNode",
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
index 43b8ad82e4..a5b9422ac8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
@@ -215,6 +215,7 @@ public class AsyncClientHandler<Q, R> {
case INVALIDATE_MATCHED_SCHEMA_CACHE:
case UPDATE_TEMPLATE:
case CHANGE_REGION_LEADER:
+ case KILL_QUERY_INSTANCE:
default:
return new AsyncTSStatusRPCHandler(
requestType,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
index 3136e05da1..dfa02ed192 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
@@ -121,6 +121,8 @@ public class SyncDataNodeClientPool {
return client.stopDataNode();
case SET_SYSTEM_STATUS:
return client.setSystemStatus((String) req);
+ case KILL_QUERY_INSTANCE:
+ return client.killQueryInstance((String) req);
case UPDATE_TEMPLATE:
return client.updateTemplate((TUpdateTemplateReq) req);
case CREATE_NEW_REGION_PEER:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index a5ea78ed40..47ece8115c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -1107,6 +1107,14 @@ public class ConfigManager implements IManager {
: status;
}
+ @Override
+ public TSStatus killQuery(String queryId, int dataNodeId) {
+ TSStatus status = confirmLeader();
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ? nodeManager.killQuery(queryId, dataNodeId)
+ : status;
+ }
+
@Override
public TGetDataNodeLocationsResp getRunningDataNodeLocations() {
TSStatus status = confirmLeader();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index c6d38aab6d..913e7bb105 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -411,6 +411,8 @@ public interface IManager {
/** TestOnly. Set the target DataNode to the specified status */
TSStatus setDataNodeStatus(TSetDataNodeStatusReq req);
+ TSStatus killQuery(String queryId, int dataNodeId);
+
TGetDataNodeLocationsResp getRunningDataNodeLocations();
/**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 866ceeab96..1c4c5bb401 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -674,6 +674,44 @@ public class NodeManager {
DataNodeRequestType.SET_SYSTEM_STATUS);
}
+ /**
+ * Kill query on DataNode
+ *
+ * @param queryId the id of specific query need to be killed, it will be NULL if kill all queries
+ * @param dataNodeId the DataNode obtains target query, -1 means we will kill all queries on all
+ * DataNodes
+ */
+ public TSStatus killQuery(String queryId, int dataNodeId) {
+ if (dataNodeId < 0) {
+ return killAllQueries();
+ } else {
+ return killSpecificQuery(queryId, getRegisteredDataNodeLocations().get(dataNodeId));
+ }
+ }
+
+ private TSStatus killAllQueries() {
+ Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ configManager.getNodeManager().getRegisteredDataNodeLocations();
+ AsyncClientHandler<String, TSStatus> clientHandler =
+ new AsyncClientHandler<>(DataNodeRequestType.KILL_QUERY_INSTANCE, dataNodeLocationMap);
+ AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+ return RpcUtils.squashResponseStatusList(clientHandler.getResponseList());
+ }
+
+ private TSStatus killSpecificQuery(String queryId, TDataNodeLocation dataNodeLocation) {
+ if (dataNodeLocation == null) {
+ return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
+ .setMessage(
+ "The target DataNode is not existed, please ensure your input <queryId> is correct");
+ } else {
+ return SyncDataNodeClientPool.getInstance()
+ .sendSyncRequestToDataNodeWithRetry(
+ dataNodeLocation.getInternalEndPoint(),
+ queryId,
+ DataNodeRequestType.KILL_QUERY_INSTANCE);
+ }
+ }
+
/** Start the heartbeat service */
public void startHeartbeatService() {
synchronized (scheduleMonitor) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index f2973db275..f3091c0066 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -643,6 +643,11 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
return configManager.setDataNodeStatus(req);
}
+ @Override
+ public TSStatus killQuery(String queryId, int dataNodeId) {
+ return configManager.killQuery(queryId, dataNodeId);
+ }
+
@Override
public TGetDataNodeLocationsResp getRunningDataNodeLocations() {
return configManager.getRunningDataNodeLocations();
diff --git a/docs/UserGuide/Maintenance-Tools/Maintenance-Command.md b/docs/UserGuide/Maintenance-Tools/Maintenance-Command.md
index 11ffcdd7a0..da03e2fd66 100644
--- a/docs/UserGuide/Maintenance-Tools/Maintenance-Command.md
+++ b/docs/UserGuide/Maintenance-Tools/Maintenance-Command.md
@@ -95,14 +95,24 @@ session.executeQueryStatement(String sql, long timeout)
In addition to waiting for the query to time out passively, IoTDB also supports stopping the query actively:
+#### Kill specific query
+
```sql
KILL QUERY <queryId>
```
-You can abort the specified query by specifying `queryId`. If `queryId` is not specified, all executing queries will be killed.
+You can kill the specified query by specifying `queryId`.
To get the executing `queryId`,you can use the [show queries](#show-queries) command, which will show the list of all executing queries.
+#### Kill all queries
+
+```sql
+KILL ALL QUERIES
+```
+
+Kill all queries on all DataNodes.
+
## SHOW QUERIES
This command is used to display all ongoing queries, here are usage scenarios:
@@ -164,7 +174,7 @@ SQL result:
SQL string:
```SQL
-SHOW QUERIES ORDER BY Time DESC limit 5
+SHOW QUERIES limit 5
```
SQL result:
diff --git a/docs/UserGuide/Reference/Status-Codes.md b/docs/UserGuide/Reference/Status-Codes.md
index 57b97f79b2..c770b30c20 100644
--- a/docs/UserGuide/Reference/Status-Codes.md
+++ b/docs/UserGuide/Reference/Status-Codes.md
@@ -107,6 +107,8 @@ Here is a list of Status Code and related message:
| 711 | TSBLOCK_SERIALIZE_ERROR | TsBlock serialization error |
| 712 | INTERNAL_REQUEST_TIME_OUT | MPP Operation timeout |
| 713 | INTERNAL_REQUEST_RETRY_ERROR | Internal operation retry failed |
+| 714 | NO_SUCH_QUERY | Cannot find target query |
+| 715 | QUERY_WAS_KILLED | Query was killed when execute |
| 800 | UNINITIALIZED_AUTH_ERROR | Failed to initialize auth module |
| 801 | WRONG_LOGIN_PASSWORD | Username or password is wrong |
| 802 | NOT_LOGIN | Not login |
diff --git a/docs/zh/UserGuide/Maintenance-Tools/Maintenance-Command.md b/docs/zh/UserGuide/Maintenance-Tools/Maintenance-Command.md
index d31fcfe19f..2cc895eba2 100644
--- a/docs/zh/UserGuide/Maintenance-Tools/Maintenance-Command.md
+++ b/docs/zh/UserGuide/Maintenance-Tools/Maintenance-Command.md
@@ -91,16 +91,26 @@ session.executeQueryStatement(String sql, long timeout)
### 查询终止
-除了被动地等待查询超时外,IoTDB 还支持主动地终止查询,命令为:
+除了被动地等待查询超时外,IoTDB 还支持主动地终止查询:
+
+#### 终止指定查询
```sql
KILL QUERY <queryId>
```
-通过指定 `queryId` 可以中止指定的查询,而如果不指定 `queryId`,将中止所有正在执行的查询。
+通过指定 `queryId` 可以中止指定的查询。
为了获取正在执行的查询 id,用户可以使用 [show queries](#show-queries) 命令,该命令将显示所有正在执行的查询列表。
+#### 终止所有查询
+
+```sql
+KILL ALL QUERIES
+```
+
+终止所有DataNode上的所有查询。
+
## SHOW QUERIES
该命令用于显示所有正在执行的查询,有以下使用场景:
@@ -162,7 +172,7 @@ SHOW QUERIES WHERE ElapsedTime > 30
SQL 语句为:
```SQL
-SHOW QUERIES ORDER BY Time DESC limit 5
+SHOW QUERIES limit 5
```
该 SQL 语句的执行结果如下:
diff --git a/docs/zh/UserGuide/Reference/Status-Codes.md b/docs/zh/UserGuide/Reference/Status-Codes.md
index 6387647bbc..9f4cd2c5e0 100644
--- a/docs/zh/UserGuide/Reference/Status-Codes.md
+++ b/docs/zh/UserGuide/Reference/Status-Codes.md
@@ -108,6 +108,8 @@ try {
| 711 | TSBLOCK_SERIALIZE_ERROR | TsBlock 序列化错误 |
| 712 | INTERNAL_REQUEST_TIME_OUT | MPP 操作超时 |
| 713 | INTERNAL_REQUEST_RETRY_ERROR | 内部操作重试失败 |
+| 714 | NO_SUCH_QUERY | 查询不存在 |
+| 715 | QUERY_WAS_KILLED | 查询执行时被终止 |
| 800 | UNINITIALIZED_AUTH_ERROR | 授权模块未初始化 |
| 801 | WRONG_LOGIN_PASSWORD | 用户名或密码错误 |
| 802 | NOT_LOGIN | 没有登录 |
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index a0964e739d..89cb68600f 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -1039,6 +1039,27 @@ public class ConfigNodeClient
throw new TException("DataNode to ConfigNode client doesn't support setDataNodeStatus.");
}
+ @Override
+ public TSStatus killQuery(String queryId, int dataNodeId) throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TSStatus status = client.killQuery(queryId, dataNodeId);
+ if (!updateConfigNodeLeader(status)) {
+ return status;
+ }
+ } catch (TException e) {
+ logger.warn(
+ "Failed to connect to ConfigNode {} from DataNode {} when executing {}",
+ configNode,
+ config.getAddressAndPort(),
+ Thread.currentThread().getStackTrace()[1].getMethodName());
+ configLeader = null;
+ }
+ waitAndReconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
@Override
public TGetDataNodeLocationsResp getRunningDataNodeLocations() throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/exception/query/KilledByOthersException.java
similarity index 50%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
copy to server/src/main/java/org/apache/iotdb/db/exception/query/KilledByOthersException.java
index 1d9f464667..3843c92eae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/query/KilledByOthersException.java
@@ -15,49 +15,20 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
+ *
*/
-package org.apache.iotdb.db.mpp.plan.execution;
+package org.apache.iotdb.db.exception.query;
import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
-import org.apache.iotdb.db.mpp.plan.statement.Statement;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-
-import java.nio.ByteBuffer;
-import java.util.Optional;
-
-public interface IQueryExecution {
-
- void start();
-
- void stop();
-
- void stopAndCleanup();
-
- ExecutionResult getStatus();
-
- Optional<TsBlock> getBatchResult() throws IoTDBException;
-
- Optional<ByteBuffer> getByteBufferBatchResult() throws IoTDBException;
-
- boolean hasNextResult();
-
- int getOutputValueColumnCount();
-
- DatasetHeader getDatasetHeader();
-
- boolean isQuery();
-
- String getQueryId();
-
- long getStartExecutionTime();
-
- void recordExecutionTime(long executionTime);
+import org.apache.iotdb.rpc.TSStatusCode;
- long getTotalExecutionTime();
+public class KilledByOthersException extends IoTDBException {
+ private static final long serialVersionUID = -6027957067833327712L;
- Optional<String> getExecuteSQL();
+ public static final String MESSAGE = "Query was killed by others";
- Statement getStatement();
+ public KilledByOthersException() {
+ super(MESSAGE, TSStatusCode.QUERY_WAS_KILLED.getStatusCode(), true);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryIdGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryIdGenerator.java
index bc814391d5..49ad488d08 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryIdGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryIdGenerator.java
@@ -66,7 +66,7 @@ public class QueryIdGenerator {
}
/**
- * Generate next queryId using the following format: {@code YYYYMMDD_hhmmss_index_dataNodeId}
+ * Generate next queryId using the following format: {@code yyyyMMdd_HHmmss_index_dataNodeId}
*
* <p>{@code index} rolls at the start of every day or when it is close to reaching {@code
* 99,999}. {@code dataNodeId} is a unique id generated by config node when this data node is
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index a9a627d299..53a52bd940 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -106,6 +106,15 @@ public class QueryStateMachine {
queryState.set(QueryState.CANCELED);
}
+ public void transitionToCanceled(Throwable throwable, TSStatus failureStatus) {
+ if (queryState.get().isDone()) {
+ return;
+ }
+ this.failureException = throwable;
+ this.failureStatus = failureStatus;
+ queryState.set(QueryState.CANCELED);
+ }
+
public void transitionToAborted() {
if (queryState.get().isDone()) {
return;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
index 1d9f464667..1dae431f10 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
@@ -35,6 +35,8 @@ public interface IQueryExecution {
void stopAndCleanup();
+ void cancel();
+
ExecutionResult getStatus();
Optional<TsBlock> getBatchResult() throws IoTDBException;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index d27480f3a4..e8e2afa40c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.query.KilledByOthersException;
import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
@@ -339,6 +340,14 @@ public class QueryExecution implements IQueryExecution {
releaseResource();
}
+ @Override
+ public void cancel() {
+ stateMachine.transitionToCanceled(
+ new KilledByOthersException(),
+ new TSStatus(TSStatusCode.QUERY_WAS_KILLED.getStatusCode())
+ .setMessage(KilledByOthersException.MESSAGE));
+ }
+
/** Release the resources that current QueryExecution hold. */
private void releaseResource() {
// close ResultHandle to unblock client's getResult request
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
index 8762cd748e..6aca2d96fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
@@ -150,6 +150,11 @@ public class ConfigExecution implements IQueryExecution {
@Override
public void stopAndCleanup() {}
+ @Override
+ public void cancel() {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
@Override
public ExecutionResult getStatus() {
try {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
index 0535e1ef1a..14d8fea3d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
@@ -57,6 +57,7 @@ import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.UnsetSche
import org.apache.iotdb.db.mpp.plan.execution.config.sys.AuthorizerTask;
import org.apache.iotdb.db.mpp.plan.execution.config.sys.ClearCacheTask;
import org.apache.iotdb.db.mpp.plan.execution.config.sys.FlushTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.sys.KillQueryTask;
import org.apache.iotdb.db.mpp.plan.execution.config.sys.LoadConfigurationTask;
import org.apache.iotdb.db.mpp.plan.execution.config.sys.MergeTask;
import org.apache.iotdb.db.mpp.plan.execution.config.sys.SetSystemStatusTask;
@@ -108,6 +109,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.template.UnsetSchemaTempl
import org.apache.iotdb.db.mpp.plan.statement.sys.AuthorStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.ClearCacheStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.FlushStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.KillQueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.LoadConfigurationStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.MergeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.SetSystemStatusStatement;
@@ -221,6 +223,11 @@ public class ConfigTaskVisitor
return new SetSystemStatusTask(setSystemStatusStatement);
}
+ @Override
+ public IConfigTask visitKillQuery(KillQueryStatement killQueryStatement, TaskContext context) {
+ return new KillQueryTask(killQueryStatement);
+ }
+
@Override
public IConfigTask visitCreateFunction(
CreateFunctionStatement createFunctionStatement, TaskContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index f406eca790..4790ef2149 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -77,6 +77,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
import org.apache.iotdb.db.client.ConfigNodeClient;
import org.apache.iotdb.db.client.ConfigNodeInfo;
import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
+import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.localconfignode.LocalConfigNode;
import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
import org.apache.iotdb.db.metadata.template.Template;
@@ -128,6 +129,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowNodesInSchem
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathSetTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowSchemaTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.UnsetSchemaTemplateStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.KillQueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
@@ -767,6 +769,38 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
return future;
}
+ @Override
+ public SettableFuture<ConfigTaskResult> killQuery(KillQueryStatement killQueryStatement) {
+ int dataNodeId = -1;
+ String queryId = killQueryStatement.getQueryId();
+ if (!killQueryStatement.isKillAll()) {
+ String[] splits = queryId.split("_");
+ try {
+ // We just judge the input queryId has three '_' and the DataNodeId from it is non-negative
+ // here
+ if (splits.length != 4 || ((dataNodeId = Integer.parseInt(splits[3])) < 0)) {
+ throw new SemanticException("Please ensure your input <queryId> is correct");
+ }
+ } catch (NumberFormatException e) {
+ throw new SemanticException("Please ensure your input <queryId> is correct");
+ }
+ }
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try (ConfigNodeClient client =
+ CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
+ final TSStatus executionStatus = client.killQuery(queryId, dataNodeId);
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
+ LOGGER.warn("Failed to kill query [{}], because {}", queryId, executionStatus.message);
+ future.setException(new IoTDBException(executionStatus.message, executionStatus.code));
+ } else {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
+ } catch (ClientManagerException | TException e) {
+ future.setException(e);
+ }
+ return future;
+ }
+
@Override
public SettableFuture<ConfigTaskResult> showCluster(ShowClusterStatement showClusterStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
index 562a7aa7a0..ba75d3cadd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowNodesInSchem
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathSetTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowSchemaTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.UnsetSchemaTemplateStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.KillQueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
@@ -96,6 +97,8 @@ public interface IConfigTaskExecutor {
SettableFuture<ConfigTaskResult> setSystemStatus(boolean onCluster, NodeStatus status);
+ SettableFuture<ConfigTaskResult> killQuery(KillQueryStatement killQueryStatement);
+
SettableFuture<ConfigTaskResult> showCluster(ShowClusterStatement showClusterStatement);
SettableFuture<ConfigTaskResult> showClusterParameters();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/KillQueryTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/KillQueryTask.java
new file mode 100644
index 0000000000..0bbe07bef1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/KillQueryTask.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.config.sys;
+
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.db.mpp.plan.statement.sys.KillQueryStatement;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class KillQueryTask implements IConfigTask {
+
+ private final KillQueryStatement killQueryStatement;
+
+ public KillQueryTask(KillQueryStatement killQueryStatement) {
+ this.killQueryStatement = killQueryStatement;
+ }
+
+ @Override
+ public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
+ throws InterruptedException {
+ return configTaskExecutor.killQuery(killQueryStatement);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 30a6d2f173..919b943b8b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -142,6 +142,7 @@ import org.apache.iotdb.db.mpp.plan.statement.sys.AuthorStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.ClearCacheStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.ExplainStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.FlushStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.KillQueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.LoadConfigurationStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.MergeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.SetSystemStatusStatement;
@@ -2501,6 +2502,15 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
return setSystemStatusStatement;
}
+ // Kill Query
+ @Override
+ public Statement visitKillQuery(IoTDBSqlParser.KillQueryContext ctx) {
+ if (ctx.queryId != null) {
+ return new KillQueryStatement(parseStringLiteral(ctx.queryId.getText()));
+ }
+ return new KillQueryStatement();
+ }
+
// show query processlist
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index 33a3b28057..280a0cf0cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -82,6 +82,7 @@ import org.apache.iotdb.db.mpp.plan.statement.sys.AuthorStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.ClearCacheStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.ExplainStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.FlushStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.KillQueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.LoadConfigurationStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.MergeStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.SetSystemStatusStatement;
@@ -324,6 +325,10 @@ public abstract class StatementVisitor<R, C> {
return visitStatement(setSystemStatusStatement, context);
}
+ public R visitKillQuery(KillQueryStatement killQueryStatement, C context) {
+ return visitStatement(killQueryStatement, context);
+ }
+
public R visitShowQueries(ShowQueriesStatement showQueriesStatement, C context) {
return visitStatement(showQueriesStatement, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/KillQueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/KillQueryStatement.java
new file mode 100644
index 0000000000..2b4b75de8f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/KillQueryStatement.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.sys;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+
+import java.util.Collections;
+import java.util.List;
+
+public class KillQueryStatement extends Statement implements IConfigStatement {
+ private final String queryId;
+
+ public KillQueryStatement(String queryId) {
+ this.queryId = queryId;
+ }
+
+ public KillQueryStatement() {
+ this.queryId = null;
+ }
+
+ public boolean isKillAll() {
+ return queryId == null;
+ }
+
+ public String getQueryId() {
+ return queryId;
+ }
+
+ @Override
+ public List<? extends PartialPath> getPaths() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitKillQuery(this, context);
+ }
+
+ @Override
+ public QueryType getQueryType() {
+ return QueryType.WRITE;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index d1a8d0c8cd..956b4be9f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1132,6 +1132,25 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
+ @Override
+ public TSStatus killQueryInstance(String queryId) {
+ Coordinator coordinator = Coordinator.getInstance();
+ if (queryId == null) {
+ coordinator.getAllQueryExecutions().forEach(IQueryExecution::cancel);
+ } else {
+ Optional<IQueryExecution> queryExecution =
+ coordinator.getAllQueryExecutions().stream()
+ .filter(iQueryExecution -> iQueryExecution.getQueryId().equals(queryId))
+ .findAny();
+ if (queryExecution.isPresent()) {
+ queryExecution.get().cancel();
+ } else {
+ return new TSStatus(TSStatusCode.NO_SUCH_QUERY.getStatusCode()).setMessage("No such query");
+ }
+ }
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
@Override
public TSStatus setTTL(TSetTTLReq req) throws TException {
return storageEngine.setTTL(req);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
index 9a59a462e6..58dd3932aa 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
@@ -1670,6 +1670,9 @@ public class MergeSortOperatorTest {
@Override
public void stopAndCleanup() {}
+ @Override
+ public void cancel() {}
+
@Override
public ExecutionResult getStatus() {
return null;
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 43caa31b52..dd08028155 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -105,6 +105,8 @@ public enum TSStatusCode {
TSBLOCK_SERIALIZE_ERROR(711),
INTERNAL_REQUEST_TIME_OUT(712),
INTERNAL_REQUEST_RETRY_ERROR(713),
+ NO_SUCH_QUERY(714),
+ QUERY_WAS_KILLED(715),
// Authentication
INIT_AUTH_ERROR(800),
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 9cdd54d33c..b8dd2ee758 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -1066,6 +1066,9 @@ service IConfigNodeRPCService {
/** Migrate a region replica from one dataNode to another */
common.TSStatus migrateRegion(TMigrateRegionReq req)
+ /** Kill query */
+ common.TSStatus killQuery(string queryId, i32 dataNodeId)
+
/** Get all DataNodeLocations of Running DataNodes */
TGetDataNodeLocationsResp getRunningDataNodeLocations()
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index ee5d39a2ee..64c3e416c5 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -617,6 +617,8 @@ service IDataNodeRPCService {
common.TSStatus setSystemStatus(string status)
+ common.TSStatus killQueryInstance(string queryId)
+
/**
* Config node will Set the TTL for the database on a list of data nodes.
*/