You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/01/06 01:48:31 UTC

[iotdb] branch master updated: [IOTDB-4817] Support kill query in cluster

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e98062c1f [IOTDB-4817] Support kill query in cluster
6e98062c1f is described below

commit 6e98062c1fcdc95534f195d4d9b3a9e0647d3ad2
Author: Weihao Li <60...@users.noreply.github.com>
AuthorDate: Fri Jan 6 09:48:26 2023 +0800

    [IOTDB-4817] Support kill query in cluster
---
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  2 +-
 .../confignode/client/DataNodeRequestType.java     |  5 +-
 .../client/async/AsyncDataNodeClientPool.java      |  6 ++
 .../client/async/handlers/AsyncClientHandler.java  |  1 +
 .../client/sync/SyncDataNodeClientPool.java        |  2 +
 .../iotdb/confignode/manager/ConfigManager.java    |  8 +++
 .../apache/iotdb/confignode/manager/IManager.java  |  2 +
 .../iotdb/confignode/manager/node/NodeManager.java | 38 +++++++++++++
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  5 ++
 .../Maintenance-Tools/Maintenance-Command.md       | 14 ++++-
 docs/UserGuide/Reference/Status-Codes.md           |  2 +
 .../Maintenance-Tools/Maintenance-Command.md       | 16 +++++-
 docs/zh/UserGuide/Reference/Status-Codes.md        |  2 +
 .../apache/iotdb/db/client/ConfigNodeClient.java   | 21 +++++++
 .../query/KilledByOthersException.java}            | 47 +++-------------
 .../iotdb/db/mpp/execution/QueryIdGenerator.java   |  2 +-
 .../iotdb/db/mpp/execution/QueryStateMachine.java  |  9 +++
 .../db/mpp/plan/execution/IQueryExecution.java     |  2 +
 .../db/mpp/plan/execution/QueryExecution.java      |  9 +++
 .../mpp/plan/execution/config/ConfigExecution.java |  5 ++
 .../plan/execution/config/ConfigTaskVisitor.java   |  7 +++
 .../config/executor/ClusterConfigTaskExecutor.java | 34 ++++++++++++
 .../config/executor/IConfigTaskExecutor.java       |  3 +
 .../plan/execution/config/sys/KillQueryTask.java   | 42 ++++++++++++++
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       | 10 ++++
 .../db/mpp/plan/statement/StatementVisitor.java    |  5 ++
 .../mpp/plan/statement/sys/KillQueryStatement.java | 64 ++++++++++++++++++++++
 .../impl/DataNodeInternalRPCServiceImpl.java       | 19 +++++++
 .../execution/operator/MergeSortOperatorTest.java  |  3 +
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |  2 +
 .../src/main/thrift/confignode.thrift              |  3 +
 thrift/src/main/thrift/datanode.thrift             |  2 +
 32 files changed, 346 insertions(+), 46 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index ba9b4d6d8f..0d6a2cc46f 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -741,7 +741,7 @@ showQueries
 
 // Kill Query
 killQuery
-    : KILL QUERY INTEGER_LITERAL?
+    : KILL (QUERY queryId=STRING_LITERAL | ALL QUERIES)
     ;
 
 // Grant Watermark Embedding
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index ff236ce339..335a818574 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -85,5 +85,8 @@ public enum DataNodeRequestType {
   CONSTRUCT_SCHEMA_BLACK_LIST_WITH_TEMPLATE,
   ROLLBACK_SCHEMA_BLACK_LIST_WITH_TEMPLATE,
   DEACTIVATE_TEMPLATE,
-  COUNT_PATHS_USING_TEMPLATE
+  COUNT_PATHS_USING_TEMPLATE,
+
+  /** @TODO Need to migrate to 'Node Maintenance' */
+  KILL_QUERY_INSTANCE
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index 7f0404cc69..097753ccbd 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -324,6 +324,12 @@ public class AsyncDataNodeClientPool {
               (CountPathsUsingTemplateRPCHandler)
                   clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
           break;
+        case KILL_QUERY_INSTANCE:
+          client.killQueryInstance(
+              (String) clientHandler.getRequest(requestId),
+              (AsyncTSStatusRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, targetDataNode));
+          break;
         default:
           LOGGER.error(
               "Unexpected DataNode Request Type: {} when sendAsyncRequestToDataNode",
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
index 43b8ad82e4..a5b9422ac8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
@@ -215,6 +215,7 @@ public class AsyncClientHandler<Q, R> {
       case INVALIDATE_MATCHED_SCHEMA_CACHE:
       case UPDATE_TEMPLATE:
       case CHANGE_REGION_LEADER:
+      case KILL_QUERY_INSTANCE:
       default:
         return new AsyncTSStatusRPCHandler(
             requestType,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
index 3136e05da1..dfa02ed192 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
@@ -121,6 +121,8 @@ public class SyncDataNodeClientPool {
         return client.stopDataNode();
       case SET_SYSTEM_STATUS:
         return client.setSystemStatus((String) req);
+      case KILL_QUERY_INSTANCE:
+        return client.killQueryInstance((String) req);
       case UPDATE_TEMPLATE:
         return client.updateTemplate((TUpdateTemplateReq) req);
       case CREATE_NEW_REGION_PEER:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index a5ea78ed40..47ece8115c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -1107,6 +1107,14 @@ public class ConfigManager implements IManager {
         : status;
   }
 
+  @Override
+  public TSStatus killQuery(String queryId, int dataNodeId) {
+    TSStatus status = confirmLeader();
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        ? nodeManager.killQuery(queryId, dataNodeId)
+        : status;
+  }
+
   @Override
   public TGetDataNodeLocationsResp getRunningDataNodeLocations() {
     TSStatus status = confirmLeader();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index c6d38aab6d..913e7bb105 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -411,6 +411,8 @@ public interface IManager {
   /** TestOnly. Set the target DataNode to the specified status */
   TSStatus setDataNodeStatus(TSetDataNodeStatusReq req);
 
+  TSStatus killQuery(String queryId, int dataNodeId);
+
   TGetDataNodeLocationsResp getRunningDataNodeLocations();
 
   /**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 866ceeab96..1c4c5bb401 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -674,6 +674,44 @@ public class NodeManager {
             DataNodeRequestType.SET_SYSTEM_STATUS);
   }
 
+  /**
+   * Kill query on DataNode
+   *
+   * @param queryId the id of specific query need to be killed, it will be NULL if kill all queries
+   * @param dataNodeId the DataNode obtains target query, -1 means we will kill all queries on all
+   *     DataNodes
+   */
+  public TSStatus killQuery(String queryId, int dataNodeId) {
+    if (dataNodeId < 0) {
+      return killAllQueries();
+    } else {
+      return killSpecificQuery(queryId, getRegisteredDataNodeLocations().get(dataNodeId));
+    }
+  }
+
+  private TSStatus killAllQueries() {
+    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        configManager.getNodeManager().getRegisteredDataNodeLocations();
+    AsyncClientHandler<String, TSStatus> clientHandler =
+        new AsyncClientHandler<>(DataNodeRequestType.KILL_QUERY_INSTANCE, dataNodeLocationMap);
+    AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    return RpcUtils.squashResponseStatusList(clientHandler.getResponseList());
+  }
+
+  private TSStatus killSpecificQuery(String queryId, TDataNodeLocation dataNodeLocation) {
+    if (dataNodeLocation == null) {
+      return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
+          .setMessage(
+              "The target DataNode is not existed, please ensure your input <queryId> is correct");
+    } else {
+      return SyncDataNodeClientPool.getInstance()
+          .sendSyncRequestToDataNodeWithRetry(
+              dataNodeLocation.getInternalEndPoint(),
+              queryId,
+              DataNodeRequestType.KILL_QUERY_INSTANCE);
+    }
+  }
+
   /** Start the heartbeat service */
   public void startHeartbeatService() {
     synchronized (scheduleMonitor) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index f2973db275..f3091c0066 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -643,6 +643,11 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
     return configManager.setDataNodeStatus(req);
   }
 
+  @Override
+  public TSStatus killQuery(String queryId, int dataNodeId) {
+    return configManager.killQuery(queryId, dataNodeId);
+  }
+
   @Override
   public TGetDataNodeLocationsResp getRunningDataNodeLocations() {
     return configManager.getRunningDataNodeLocations();
diff --git a/docs/UserGuide/Maintenance-Tools/Maintenance-Command.md b/docs/UserGuide/Maintenance-Tools/Maintenance-Command.md
index 11ffcdd7a0..da03e2fd66 100644
--- a/docs/UserGuide/Maintenance-Tools/Maintenance-Command.md
+++ b/docs/UserGuide/Maintenance-Tools/Maintenance-Command.md
@@ -95,14 +95,24 @@ session.executeQueryStatement(String sql, long timeout)
 
 In addition to waiting for the query to time out passively, IoTDB also supports stopping the query actively:
 
+#### Kill specific query
+
 ```sql
 KILL QUERY <queryId>
 ```
 
-You can abort the specified query by specifying `queryId`. If `queryId` is not specified, all executing queries will be killed.
+You can kill the specified query by specifying `queryId`.
 
 To get the executing `queryId`,you can use the [show queries](#show-queries) command, which will show the list of all executing queries.
 
+#### Kill all queries
+
+```sql
+KILL ALL QUERIES
+```
+
+Kill all queries on all DataNodes.
+
 ## SHOW QUERIES
 
 This command is used to display all ongoing queries, here are usage scenarios:
@@ -164,7 +174,7 @@ SQL result:
 
 SQL string:
 ```SQL
-SHOW QUERIES ORDER BY Time DESC limit 5
+SHOW QUERIES limit 5
 ```
 
 SQL result:
diff --git a/docs/UserGuide/Reference/Status-Codes.md b/docs/UserGuide/Reference/Status-Codes.md
index 57b97f79b2..c770b30c20 100644
--- a/docs/UserGuide/Reference/Status-Codes.md
+++ b/docs/UserGuide/Reference/Status-Codes.md
@@ -107,6 +107,8 @@ Here is a list of Status Code and related message:
 | 711         | TSBLOCK_SERIALIZE_ERROR           | TsBlock serialization error                                                               |
 | 712         | INTERNAL_REQUEST_TIME_OUT         | MPP Operation timeout                                                                     |
 | 713         | INTERNAL_REQUEST_RETRY_ERROR      | Internal operation retry failed                                                           |
+| 714         | NO_SUCH_QUERY                     | Cannot find target query                                                                  |
+| 715         | QUERY_WAS_KILLED                  | Query was killed when execute                                                             |
 | 800         | UNINITIALIZED_AUTH_ERROR          | Failed to initialize auth module                                                          |
 | 801         | WRONG_LOGIN_PASSWORD              | Username or password is wrong                                                             |
 | 802         | NOT_LOGIN                         | Not login                                                                                 |
diff --git a/docs/zh/UserGuide/Maintenance-Tools/Maintenance-Command.md b/docs/zh/UserGuide/Maintenance-Tools/Maintenance-Command.md
index d31fcfe19f..2cc895eba2 100644
--- a/docs/zh/UserGuide/Maintenance-Tools/Maintenance-Command.md
+++ b/docs/zh/UserGuide/Maintenance-Tools/Maintenance-Command.md
@@ -91,16 +91,26 @@ session.executeQueryStatement(String sql, long timeout)
 
 ### 查询终止
 
-除了被动地等待查询超时外,IoTDB 还支持主动地终止查询,命令为:
+除了被动地等待查询超时外,IoTDB 还支持主动地终止查询:
+
+#### 终止指定查询
 
 ```sql
 KILL QUERY <queryId>
 ```
 
-通过指定 `queryId` 可以中止指定的查询,而如果不指定 `queryId`,将中止所有正在执行的查询。
+通过指定 `queryId` 可以中止指定的查询。
 
 为了获取正在执行的查询 id,用户可以使用 [show queries](#show-queries) 命令,该命令将显示所有正在执行的查询列表。
 
+#### 终止所有查询
+
+```sql
+KILL ALL QUERIES
+```
+
+终止所有DataNode上的所有查询。
+
 ## SHOW QUERIES
 
 该命令用于显示所有正在执行的查询,有以下使用场景:
@@ -162,7 +172,7 @@ SHOW QUERIES WHERE ElapsedTime > 30
 
 SQL 语句为:
 ```SQL
-SHOW QUERIES ORDER BY Time DESC limit 5
+SHOW QUERIES limit 5
 ```
 
 该 SQL 语句的执行结果如下:
diff --git a/docs/zh/UserGuide/Reference/Status-Codes.md b/docs/zh/UserGuide/Reference/Status-Codes.md
index 6387647bbc..9f4cd2c5e0 100644
--- a/docs/zh/UserGuide/Reference/Status-Codes.md
+++ b/docs/zh/UserGuide/Reference/Status-Codes.md
@@ -108,6 +108,8 @@ try {
 | 711    | TSBLOCK_SERIALIZE_ERROR           | TsBlock 序列化错误                        |
 | 712    | INTERNAL_REQUEST_TIME_OUT         | MPP 操作超时                              |
 | 713    | INTERNAL_REQUEST_RETRY_ERROR      | 内部操作重试失败                          |
+| 714    | NO_SUCH_QUERY                     | 查询不存在                               |
+| 715    | QUERY_WAS_KILLED                  | 查询执行时被终止                          |
 | 800    | UNINITIALIZED_AUTH_ERROR          | 授权模块未初始化                          |
 | 801    | WRONG_LOGIN_PASSWORD              | 用户名或密码错误                          |
 | 802    | NOT_LOGIN                         | 没有登录                                  |
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index a0964e739d..89cb68600f 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -1039,6 +1039,27 @@ public class ConfigNodeClient
     throw new TException("DataNode to ConfigNode client doesn't support setDataNodeStatus.");
   }
 
+  @Override
+  public TSStatus killQuery(String queryId, int dataNodeId) throws TException {
+    for (int i = 0; i < RETRY_NUM; i++) {
+      try {
+        TSStatus status = client.killQuery(queryId, dataNodeId);
+        if (!updateConfigNodeLeader(status)) {
+          return status;
+        }
+      } catch (TException e) {
+        logger.warn(
+            "Failed to connect to ConfigNode {} from DataNode {} when executing {}",
+            configNode,
+            config.getAddressAndPort(),
+            Thread.currentThread().getStackTrace()[1].getMethodName());
+        configLeader = null;
+      }
+      waitAndReconnect();
+    }
+    throw new TException(MSG_RECONNECTION_FAIL);
+  }
+
   @Override
   public TGetDataNodeLocationsResp getRunningDataNodeLocations() throws TException {
     for (int i = 0; i < RETRY_NUM; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/exception/query/KilledByOthersException.java
similarity index 50%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
copy to server/src/main/java/org/apache/iotdb/db/exception/query/KilledByOthersException.java
index 1d9f464667..3843c92eae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/query/KilledByOthersException.java
@@ -15,49 +15,20 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
+ *
  */
 
-package org.apache.iotdb.db.mpp.plan.execution;
+package org.apache.iotdb.db.exception.query;
 
 import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
-import org.apache.iotdb.db.mpp.plan.statement.Statement;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-
-import java.nio.ByteBuffer;
-import java.util.Optional;
-
-public interface IQueryExecution {
-
-  void start();
-
-  void stop();
-
-  void stopAndCleanup();
-
-  ExecutionResult getStatus();
-
-  Optional<TsBlock> getBatchResult() throws IoTDBException;
-
-  Optional<ByteBuffer> getByteBufferBatchResult() throws IoTDBException;
-
-  boolean hasNextResult();
-
-  int getOutputValueColumnCount();
-
-  DatasetHeader getDatasetHeader();
-
-  boolean isQuery();
-
-  String getQueryId();
-
-  long getStartExecutionTime();
-
-  void recordExecutionTime(long executionTime);
+import org.apache.iotdb.rpc.TSStatusCode;
 
-  long getTotalExecutionTime();
+public class KilledByOthersException extends IoTDBException {
+  private static final long serialVersionUID = -6027957067833327712L;
 
-  Optional<String> getExecuteSQL();
+  public static final String MESSAGE = "Query was killed by others";
 
-  Statement getStatement();
+  public KilledByOthersException() {
+    super(MESSAGE, TSStatusCode.QUERY_WAS_KILLED.getStatusCode(), true);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryIdGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryIdGenerator.java
index bc814391d5..49ad488d08 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryIdGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryIdGenerator.java
@@ -66,7 +66,7 @@ public class QueryIdGenerator {
   }
 
   /**
-   * Generate next queryId using the following format: {@code YYYYMMDD_hhmmss_index_dataNodeId}
+   * Generate next queryId using the following format: {@code yyyyMMdd_HHmmss_index_dataNodeId}
    *
    * <p>{@code index} rolls at the start of every day or when it is close to reaching {@code
    * 99,999}. {@code dataNodeId} is a unique id generated by config node when this data node is
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index a9a627d299..53a52bd940 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -106,6 +106,15 @@ public class QueryStateMachine {
     queryState.set(QueryState.CANCELED);
   }
 
+  public void transitionToCanceled(Throwable throwable, TSStatus failureStatus) {
+    if (queryState.get().isDone()) {
+      return;
+    }
+    this.failureException = throwable;
+    this.failureStatus = failureStatus;
+    queryState.set(QueryState.CANCELED);
+  }
+
   public void transitionToAborted() {
     if (queryState.get().isDone()) {
       return;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
index 1d9f464667..1dae431f10 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
@@ -35,6 +35,8 @@ public interface IQueryExecution {
 
   void stopAndCleanup();
 
+  void cancel();
+
   ExecutionResult getStatus();
 
   Optional<TsBlock> getBatchResult() throws IoTDBException;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index d27480f3a4..e8e2afa40c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.query.KilledByOthersException;
 import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
@@ -339,6 +340,14 @@ public class QueryExecution implements IQueryExecution {
     releaseResource();
   }
 
+  @Override
+  public void cancel() {
+    stateMachine.transitionToCanceled(
+        new KilledByOthersException(),
+        new TSStatus(TSStatusCode.QUERY_WAS_KILLED.getStatusCode())
+            .setMessage(KilledByOthersException.MESSAGE));
+  }
+
   /** Release the resources that current QueryExecution hold. */
   private void releaseResource() {
     // close ResultHandle to unblock client's getResult request
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
index 8762cd748e..6aca2d96fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
@@ -150,6 +150,11 @@ public class ConfigExecution implements IQueryExecution {
   @Override
   public void stopAndCleanup() {}
 
+  @Override
+  public void cancel() {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
   @Override
   public ExecutionResult getStatus() {
     try {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
index 0535e1ef1a..14d8fea3d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
@@ -57,6 +57,7 @@ import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.UnsetSche
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.AuthorizerTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.ClearCacheTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.FlushTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.sys.KillQueryTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.LoadConfigurationTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.MergeTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.SetSystemStatusTask;
@@ -108,6 +109,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.template.UnsetSchemaTempl
 import org.apache.iotdb.db.mpp.plan.statement.sys.AuthorStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.ClearCacheStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.FlushStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.KillQueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.LoadConfigurationStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.MergeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.SetSystemStatusStatement;
@@ -221,6 +223,11 @@ public class ConfigTaskVisitor
     return new SetSystemStatusTask(setSystemStatusStatement);
   }
 
+  @Override
+  public IConfigTask visitKillQuery(KillQueryStatement killQueryStatement, TaskContext context) {
+    return new KillQueryTask(killQueryStatement);
+  }
+
   @Override
   public IConfigTask visitCreateFunction(
       CreateFunctionStatement createFunctionStatement, TaskContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index f406eca790..4790ef2149 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -77,6 +77,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
 import org.apache.iotdb.db.client.ConfigNodeClient;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
 import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
+import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.localconfignode.LocalConfigNode;
 import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
 import org.apache.iotdb.db.metadata.template.Template;
@@ -128,6 +129,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowNodesInSchem
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathSetTemplateStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowSchemaTemplateStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.UnsetSchemaTemplateStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.KillQueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
@@ -767,6 +769,38 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
     return future;
   }
 
+  @Override
+  public SettableFuture<ConfigTaskResult> killQuery(KillQueryStatement killQueryStatement) {
+    int dataNodeId = -1;
+    String queryId = killQueryStatement.getQueryId();
+    if (!killQueryStatement.isKillAll()) {
+      String[] splits = queryId.split("_");
+      try {
+        // We just judge the input queryId has three '_' and the DataNodeId from it is non-negative
+        // here
+        if (splits.length != 4 || ((dataNodeId = Integer.parseInt(splits[3])) < 0)) {
+          throw new SemanticException("Please ensure your input <queryId> is correct");
+        }
+      } catch (NumberFormatException e) {
+        throw new SemanticException("Please ensure your input <queryId> is correct");
+      }
+    }
+    SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+    try (ConfigNodeClient client =
+        CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
+      final TSStatus executionStatus = client.killQuery(queryId, dataNodeId);
+      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
+        LOGGER.warn("Failed to kill query [{}], because {}", queryId, executionStatus.message);
+        future.setException(new IoTDBException(executionStatus.message, executionStatus.code));
+      } else {
+        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+      }
+    } catch (ClientManagerException | TException e) {
+      future.setException(e);
+    }
+    return future;
+  }
+
   @Override
   public SettableFuture<ConfigTaskResult> showCluster(ShowClusterStatement showClusterStatement) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
index 562a7aa7a0..ba75d3cadd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowNodesInSchem
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathSetTemplateStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowSchemaTemplateStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.UnsetSchemaTemplateStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.KillQueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.DropPipeSinkStatement;
@@ -96,6 +97,8 @@ public interface IConfigTaskExecutor {
 
   SettableFuture<ConfigTaskResult> setSystemStatus(boolean onCluster, NodeStatus status);
 
+  SettableFuture<ConfigTaskResult> killQuery(KillQueryStatement killQueryStatement);
+
   SettableFuture<ConfigTaskResult> showCluster(ShowClusterStatement showClusterStatement);
 
   SettableFuture<ConfigTaskResult> showClusterParameters();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/KillQueryTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/KillQueryTask.java
new file mode 100644
index 0000000000..0bbe07bef1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/sys/KillQueryTask.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.config.sys;
+
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.db.mpp.plan.statement.sys.KillQueryStatement;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class KillQueryTask implements IConfigTask {
+
+  private final KillQueryStatement killQueryStatement;
+
+  public KillQueryTask(KillQueryStatement killQueryStatement) {
+    this.killQueryStatement = killQueryStatement;
+  }
+
+  @Override
+  public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
+      throws InterruptedException {
+    return configTaskExecutor.killQuery(killQueryStatement);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 30a6d2f173..919b943b8b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -142,6 +142,7 @@ import org.apache.iotdb.db.mpp.plan.statement.sys.AuthorStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.ClearCacheStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.ExplainStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.FlushStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.KillQueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.LoadConfigurationStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.MergeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.SetSystemStatusStatement;
@@ -2501,6 +2502,15 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
     return setSystemStatusStatement;
   }
 
+  // Kill Query
+  @Override
+  public Statement visitKillQuery(IoTDBSqlParser.KillQueryContext ctx) {
+    if (ctx.queryId != null) {
+      return new KillQueryStatement(parseStringLiteral(ctx.queryId.getText()));
+    }
+    return new KillQueryStatement();
+  }
+
   // show query processlist
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index 33a3b28057..280a0cf0cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -82,6 +82,7 @@ import org.apache.iotdb.db.mpp.plan.statement.sys.AuthorStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.ClearCacheStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.ExplainStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.FlushStatement;
+import org.apache.iotdb.db.mpp.plan.statement.sys.KillQueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.LoadConfigurationStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.MergeStatement;
 import org.apache.iotdb.db.mpp.plan.statement.sys.SetSystemStatusStatement;
@@ -324,6 +325,10 @@ public abstract class StatementVisitor<R, C> {
     return visitStatement(setSystemStatusStatement, context);
   }
 
+  public R visitKillQuery(KillQueryStatement killQueryStatement, C context) {
+    return visitStatement(killQueryStatement, context);
+  }
+
   public R visitShowQueries(ShowQueriesStatement showQueriesStatement, C context) {
     return visitStatement(showQueriesStatement, context);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/KillQueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/KillQueryStatement.java
new file mode 100644
index 0000000000..2b4b75de8f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/KillQueryStatement.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.sys;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+
+import java.util.Collections;
+import java.util.List;
+
+public class KillQueryStatement extends Statement implements IConfigStatement {
+  private final String queryId;
+
+  public KillQueryStatement(String queryId) {
+    this.queryId = queryId;
+  }
+
+  public KillQueryStatement() {
+    this.queryId = null;
+  }
+
+  public boolean isKillAll() {
+    return queryId == null;
+  }
+
+  public String getQueryId() {
+    return queryId;
+  }
+
+  @Override
+  public List<? extends PartialPath> getPaths() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitKillQuery(this, context);
+  }
+
+  @Override
+  public QueryType getQueryType() {
+    return QueryType.WRITE;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index d1a8d0c8cd..956b4be9f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1132,6 +1132,25 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
   }
 
+  @Override
+  public TSStatus killQueryInstance(String queryId) {
+    Coordinator coordinator = Coordinator.getInstance();
+    if (queryId == null) {
+      coordinator.getAllQueryExecutions().forEach(IQueryExecution::cancel);
+    } else {
+      Optional<IQueryExecution> queryExecution =
+          coordinator.getAllQueryExecutions().stream()
+              .filter(iQueryExecution -> iQueryExecution.getQueryId().equals(queryId))
+              .findAny();
+      if (queryExecution.isPresent()) {
+        queryExecution.get().cancel();
+      } else {
+        return new TSStatus(TSStatusCode.NO_SUCH_QUERY.getStatusCode()).setMessage("No such query");
+      }
+    }
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+
   @Override
   public TSStatus setTTL(TSetTTLReq req) throws TException {
     return storageEngine.setTTL(req);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
index 9a59a462e6..58dd3932aa 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
@@ -1670,6 +1670,9 @@ public class MergeSortOperatorTest {
     @Override
     public void stopAndCleanup() {}
 
+    @Override
+    public void cancel() {}
+
     @Override
     public ExecutionResult getStatus() {
       return null;
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 43caa31b52..dd08028155 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -105,6 +105,8 @@ public enum TSStatusCode {
   TSBLOCK_SERIALIZE_ERROR(711),
   INTERNAL_REQUEST_TIME_OUT(712),
   INTERNAL_REQUEST_RETRY_ERROR(713),
+  NO_SUCH_QUERY(714),
+  QUERY_WAS_KILLED(715),
 
   // Authentication
   INIT_AUTH_ERROR(800),
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 9cdd54d33c..b8dd2ee758 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -1066,6 +1066,9 @@ service IConfigNodeRPCService {
   /** Migrate a region replica from one dataNode to another */
   common.TSStatus migrateRegion(TMigrateRegionReq req)
 
+  /** Kill query */
+  common.TSStatus killQuery(string queryId, i32 dataNodeId)
+
   /** Get all DataNodeLocations of Running DataNodes */
   TGetDataNodeLocationsResp getRunningDataNodeLocations()
 
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index ee5d39a2ee..64c3e416c5 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -617,6 +617,8 @@ service IDataNodeRPCService {
 
   common.TSStatus setSystemStatus(string status)
 
+  common.TSStatus killQueryInstance(string queryId)
+
   /**
    * Config node will Set the TTL for the database on a list of data nodes.
    */