You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/01/13 10:37:36 UTC

[iotdb] branch master updated: [IOTDB-965] Add timeout in query (#2352)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 652be96  [IOTDB-965] Add timeout in query (#2352)
652be96 is described below

commit 652be96969c8b35415f3dea51feb37be9108dad3
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Wed Jan 13 18:37:17 2021 +0800

    [IOTDB-965] Add timeout in query (#2352)
---
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4   |  15 ++
 docs/UserGuide/Client/Status Codes.md              |   2 +
 docs/zh/UserGuide/Client/Status Codes.md           |   2 +
 .../main/java/org/apache/iotdb/SessionExample.java |  20 ++-
 .../iotdb/jdbc/AbstractIoTDBJDBCResultSet.java     |   4 +-
 .../org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java  |   4 +-
 .../iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java     |   6 +-
 .../java/org/apache/iotdb/jdbc/IoTDBStatement.java |  29 +++-
 .../resources/conf/iotdb-engine.properties         |   3 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  13 ++
 .../org/apache/iotdb/db/conf/IoTDBConstant.java    |   3 +
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   3 +
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  11 --
 .../db/exception/QueryIdNotExsitException.java     |  26 +---
 .../query/QueryTimeoutRuntimeException.java        |  32 ++--
 .../org/apache/iotdb/db/mqtt/PublishHandler.java   | 123 +++++++--------
 .../main/java/org/apache/iotdb/db/qp/Planner.java  |   1 +
 .../apache/iotdb/db/qp/constant/SQLConstant.java   |   2 +
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  58 ++++++-
 .../org/apache/iotdb/db/qp/logical/Operator.java   |   2 +-
 .../iotdb/db/qp/logical/sys/KillQueryOperator.java |  37 ++---
 .../iotdb/db/qp/physical/sys/KillQueryPlan.java    |  39 ++---
 .../apache/iotdb/db/qp/physical/sys/ShowPlan.java  |   2 +-
 .../qp/physical/sys/ShowQueryProcesslistPlan.java  |  26 +---
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |  19 +++
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    |  11 +-
 .../db/query/control/QueryResourceManager.java     |   3 +
 .../iotdb/db/query/control/QueryTimeManager.java   | 172 +++++++++++++++++++++
 .../db/query/dataset/NonAlignEngineDataSet.java    |  22 +++
 .../dataset/RawQueryDataSetWithoutValueFilter.java |  39 ++++-
 .../db/query/executor/RawDataQueryExecutor.java    |   4 +-
 .../iotdb/db/query/reader/series/SeriesReader.java |  14 ++
 .../org/apache/iotdb/db/service/ServiceType.java   |   1 +
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  62 ++++++--
 .../iotdb/db/integration/IOTDBGroupByIT.java       |   1 -
 .../iotdb/db/integration/IoTDBKillQueryTest.java   |  84 ++++++++++
 .../db/integration/IoTDBQueryTimeoutTest.java      | 154 ++++++++++++++++++
 .../java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java |   7 +-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 .../main/java/org/apache/iotdb/session/Config.java |   3 +-
 .../java/org/apache/iotdb/session/Session.java     |  21 ++-
 .../apache/iotdb/session/SessionConnection.java    |   9 +-
 .../org/apache/iotdb/session/SessionDataSet.java   |  12 +-
 .../iotdb/session/IoTDBSessionIteratorIT.java      |  29 ++++
 thrift/src/main/thrift/rpc.thrift                  |   3 +
 .../exception/QueryTimeoutRuntimeException.java    |  32 ++--
 .../iotdb/tsfile/read/reader/LocalTsFileInput.java |  20 +++
 47 files changed, 955 insertions(+), 231 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4
index 849ae49..aa5e874 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4
@@ -79,6 +79,8 @@ statement
     | SHOW CHILD PATHS prefixPath? #showChildPaths
     | SHOW DEVICES prefixPath? limitClause? #showDevices
     | SHOW MERGE #showMergeStatus
+    | SHOW QUERY PROCESSLIST #showQueryProcesslist
+    | KILL QUERY INT? #killQuery
     | TRACING ON #tracingOn
     | TRACING OFF #tracingOff
     | COUNT TIMESERIES prefixPath? (GROUP BY LEVEL OPERATOR_EQ INT)? #countTimeseries
@@ -714,6 +716,19 @@ SHOW
     : S H O W
     ;
 
+QUERY
+    : Q U E R Y
+    ;
+
+KILL
+    : K I L L
+    ;
+
+PROCESSLIST
+    : P R O C E S S L I S T
+    ;
+
+
 GRANT
     : G R A N T
     ;
diff --git a/docs/UserGuide/Client/Status Codes.md b/docs/UserGuide/Client/Status Codes.md
index 59f7ae9..62f15cb 100644
--- a/docs/UserGuide/Client/Status Codes.md	
+++ b/docs/UserGuide/Client/Status Codes.md	
@@ -80,6 +80,8 @@ Here is a list of Status Code and related message:
 |410|PATH_ERROR|Path related error|
 |411|QUERY_PROCESS_ERROR|Query process related error|
 |412|WRITE_PROCESS_ERROR|Writing data related error|
+|413|WRITE_PROCESS_REJECT|Writing data rejected error|
+|414|QUERY_ID_NOT_EXIST|Kill query with non existent queryId|
 |500|INTERNAL_SERVER_ERROR|Internal server error|
 |501|CLOSE_OPERATION_ERROR|Meet error in close operation|
 |502|READ_ONLY_SYSTEM_ERROR|Operating system is read only|
diff --git a/docs/zh/UserGuide/Client/Status Codes.md b/docs/zh/UserGuide/Client/Status Codes.md
index e881ec7..dbe552f 100644
--- a/docs/zh/UserGuide/Client/Status Codes.md	
+++ b/docs/zh/UserGuide/Client/Status Codes.md	
@@ -80,6 +80,8 @@ try {
 |410|PATH_ERROR|路径相关错误|
 |411|QUERY_PROCESS_ERROR|查询处理相关错误|
 |412|WRITE_PROCESS_ERROR|写入相关错误|
+|413|WRITE_PROCESS_REJECT|写入拒绝错误|
+|414|QUERY_ID_NOT_EXIST|Query id 不存在|
 |500|INTERNAL_SERVER_ERROR|服务器内部错误|
 |501|CLOSE_OPERATION_ERROR|关闭操作错误|
 |502|READ_ONLY_SYSTEM_ERROR|系统只读|
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 1ab093f..d57d478 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -58,8 +58,9 @@ public class SessionExample {
     try {
       session.setStorageGroup("root.sg1");
     } catch (StatementExecutionException e) {
-      if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode())
+      if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
         throw e;
+      }
     }
 
     createTimeseries();
@@ -70,6 +71,7 @@ public class SessionExample {
     insertRecords();
     nonQuery();
     query();
+    queryWithTimeout();
     rawDataQuery();
     queryByIterator();
     deleteData();
@@ -178,7 +180,8 @@ public class SessionExample {
     }
   }
 
-  private static void insertStrRecord() throws IoTDBConnectionException, StatementExecutionException {
+  private static void insertStrRecord()
+      throws IoTDBConnectionException, StatementExecutionException {
     String deviceId = ROOT_SG1_D1;
     List<String> measurements = new ArrayList<>();
     measurements.add("s1");
@@ -249,6 +252,7 @@ public class SessionExample {
 
     session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
   }
+
   /**
    * insert the data of a device. For each timestamp, the number of measurements is the same.
    *
@@ -431,6 +435,18 @@ public class SessionExample {
     dataSet.closeOperationHandle();
   }
 
+  private static void queryWithTimeout()
+      throws IoTDBConnectionException, StatementExecutionException {
+    SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg1.d1", 2000);
+    System.out.println(dataSet.getColumnNames());
+    dataSet.setFetchSize(1024); // default is 10000
+    while (dataSet.hasNext()) {
+      System.out.println(dataSet.next());
+    }
+
+    dataSet.closeOperationHandle();
+  }
+
   private static void rawDataQuery() throws IoTDBConnectionException, StatementExecutionException {
     List<String> paths = new ArrayList<>();
     paths.add(ROOT_SG1_D1_S1);
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/AbstractIoTDBJDBCResultSet.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/AbstractIoTDBJDBCResultSet.java
index 99c300f..259b24b 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/AbstractIoTDBJDBCResultSet.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/AbstractIoTDBJDBCResultSet.java
@@ -58,11 +58,11 @@ public abstract class AbstractIoTDBJDBCResultSet implements ResultSet {
   public AbstractIoTDBJDBCResultSet(Statement statement, List<String> columnNameList,
       List<String> columnTypeList, Map<String, Integer> columnNameIndex, boolean ignoreTimeStamp,
       TSIService.Iface client,
-      String sql, long queryId, long sessionId)
+      String sql, long queryId, long sessionId, long timeout)
       throws SQLException {
     this.ioTDBRpcDataSet = new IoTDBRpcDataSet(sql, columnNameList, columnTypeList,
         columnNameIndex, ignoreTimeStamp, queryId, client, sessionId, null,
-        statement.getFetchSize());
+        statement.getFetchSize(), timeout);
     this.statement = statement;
     this.columnTypeList = columnTypeList;
   }
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java
index cda5048..84f28ec 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java
@@ -33,10 +33,10 @@ public class IoTDBJDBCResultSet extends AbstractIoTDBJDBCResultSet {
   public IoTDBJDBCResultSet(Statement statement, List<String> columnNameList,
       List<String> columnTypeList, Map<String, Integer> columnNameIndex, boolean ignoreTimeStamp,
       TSIService.Iface client,
-      String sql, long queryId, long sessionId, TSQueryDataSet dataset)
+      String sql, long queryId, long sessionId, TSQueryDataSet dataset, long timeout)
       throws SQLException {
     super(statement, columnNameList, columnTypeList, columnNameIndex, ignoreTimeStamp, client, sql,
-        queryId, sessionId);
+        queryId, sessionId, timeout);
     ioTDBRpcDataSet.setTsQueryDataSet(dataset);
   }
 
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java
index dd0b513..ad25bd4 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java
@@ -52,10 +52,10 @@ public class IoTDBNonAlignJDBCResultSet extends AbstractIoTDBJDBCResultSet {
   IoTDBNonAlignJDBCResultSet(Statement statement, List<String> columnNameList,
       List<String> columnTypeList, Map<String, Integer> columnNameIndex, boolean ignoreTimeStamp,
       TSIService.Iface client,
-      String sql, long queryId, long sessionId, TSQueryNonAlignDataSet dataset)
+      String sql, long queryId, long sessionId, TSQueryNonAlignDataSet dataset, long timeout)
       throws SQLException {
     super(statement, columnNameList, columnTypeList, columnNameIndex, ignoreTimeStamp, client, sql,
-        queryId, sessionId);
+        queryId, sessionId, timeout);
 
     times = new byte[columnNameList.size()][Long.BYTES];
 
@@ -110,7 +110,7 @@ public class IoTDBNonAlignJDBCResultSet extends AbstractIoTDBJDBCResultSet {
   protected boolean fetchResults() throws SQLException {
     TSFetchResultsReq req = new TSFetchResultsReq(ioTDBRpcDataSet.sessionId,
         ioTDBRpcDataSet.sql, ioTDBRpcDataSet.fetchSize, ioTDBRpcDataSet.queryId,
-        false);
+        false, ioTDBRpcDataSet.timeout);
     try {
       TSFetchResultsResp resp = ioTDBRpcDataSet.client.fetchResults(req);
 
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index d7cc9ec..8dab6e5 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -48,7 +48,11 @@ public class IoTDBStatement implements Statement {
   private ResultSet resultSet = null;
   private IoTDBConnection connection;
   private int fetchSize;
-  private int queryTimeout = 10;
+
+  /**
+   * query timeout with seconds unit
+   */
+  private int queryTimeout = 60;
   protected TSIService.Iface client;
   private List<String> batchSQLList;
   private static final String NOT_SUPPORT_EXECUTE = "Not support execute";
@@ -223,6 +227,7 @@ public class IoTDBStatement implements Statement {
     isCancelled = false;
     TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, stmtId);
     execReq.setFetchSize(fetchSize);
+    execReq.setTimeout((long) queryTimeout * 1000);
     TSExecuteStatementResp execResp = client.executeStatement(execReq);
     try {
       RpcUtils.verifySuccess(execResp.getStatus());
@@ -236,11 +241,11 @@ public class IoTDBStatement implements Statement {
       if (execResp.queryDataSet == null) {
         this.resultSet = new IoTDBNonAlignJDBCResultSet(this, execResp.getColumns(),
             execResp.getDataTypeList(), execResp.columnNameIndexMap, execResp.ignoreTimeStamp,
-            client, sql, queryId, sessionId, execResp.nonAlignQueryDataSet);
+            client, sql, queryId, sessionId, execResp.nonAlignQueryDataSet, execReq.timeout);
       } else {
         this.resultSet = new IoTDBJDBCResultSet(this, execResp.getColumns(),
             execResp.getDataTypeList(), execResp.columnNameIndexMap, execResp.ignoreTimeStamp,
-            client, sql, queryId, sessionId, execResp.queryDataSet);
+            client, sql, queryId, sessionId, execResp.queryDataSet, execReq.timeout);
       }
       return true;
     }
@@ -299,14 +304,21 @@ public class IoTDBStatement implements Statement {
 
   @Override
   public ResultSet executeQuery(String sql) throws SQLException {
+    return this.executeQuery(sql, (long) this.queryTimeout * 1000);
+  }
+
+  public ResultSet executeQuery(String sql, long timeoutInMS) throws SQLException {
     checkConnection("execute query");
+    if (timeoutInMS <= 0) {
+      throw new SQLException("Timeout must be over 0, please check and try again.");
+    }
     isClosed = false;
     try {
-      return executeQuerySQL(sql);
+      return executeQuerySQL(sql, timeoutInMS);
     } catch (TException e) {
       if (reConnect()) {
         try {
-          return executeQuerySQL(sql);
+          return executeQuerySQL(sql, timeoutInMS);
         } catch (TException e2) {
           throw new SQLException(
               "Fail to executeQuery " + sql + "after reconnecting. please check server status", e2);
@@ -319,10 +331,11 @@ public class IoTDBStatement implements Statement {
     }
   }
 
-  private ResultSet executeQuerySQL(String sql) throws TException, SQLException {
+  private ResultSet executeQuerySQL(String sql, long timeoutInMS) throws TException, SQLException {
     isCancelled = false;
     TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, stmtId);
     execReq.setFetchSize(fetchSize);
+    execReq.setTimeout(timeoutInMS);
     TSExecuteStatementResp execResp = client.executeQueryStatement(execReq);
     queryId = execResp.getQueryId();
     try {
@@ -338,11 +351,11 @@ public class IoTDBStatement implements Statement {
     if (execResp.queryDataSet == null) {
       this.resultSet = new IoTDBNonAlignJDBCResultSet(this, execResp.getColumns(),
           execResp.getDataTypeList(), execResp.columnNameIndexMap, execResp.ignoreTimeStamp, client,
-          sql, queryId, sessionId, execResp.nonAlignQueryDataSet);
+          sql, queryId, sessionId, execResp.nonAlignQueryDataSet, execReq.timeout);
     } else {
       this.resultSet = new IoTDBJDBCResultSet(this, execResp.getColumns(),
           execResp.getDataTypeList(), execResp.columnNameIndexMap, execResp.ignoreTimeStamp, client,
-          sql, queryId, sessionId, execResp.queryDataSet);
+          sql, queryId, sessionId, execResp.queryDataSet, execReq.timeout);
     }
     return resultSet;
   }
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index fbbba13..8201e9d 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -371,6 +371,9 @@ compaction_thread_num=10
 # The limit of write throughput merge can reach per second
 merge_write_throughput_mb_per_sec=8
 
+# The max executing time of query. unit: ms
+query_time_threshold=60000
+
 ####################
 ### Metadata Cache Configuration
 ####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index d0918f1..0871744 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -477,6 +477,11 @@ public class IoTDBConfig {
   private long cacheFileReaderClearPeriod = 100000;
 
   /**
+   * the max executing time of query in ms.
+   */
+  private int queryTimeThreshold = 60000;
+
+  /**
    * Replace implementation class of JDBC service
    */
   private String rpcImplClassName = TSServiceImpl.class.getName();
@@ -1314,6 +1319,14 @@ public class IoTDBConfig {
     this.cacheFileReaderClearPeriod = cacheFileReaderClearPeriod;
   }
 
+  public int getQueryTimeThreshold() {
+    return queryTimeThreshold;
+  }
+
+  public void setQueryTimeThreshold(int queryTimeThreshold) {
+    this.queryTimeThreshold = queryTimeThreshold;
+  }
+
   public boolean isReadOnly() {
     return readOnly;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index edd8607..aac1541 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -72,6 +72,9 @@ public class IoTDBConstant {
   public static final String COLUMN_COUNT = "count";
   public static final String COLUMN_TAGS = "tags";
   public static final String COLUMN_ATTRIBUTES = "attributes";
+  public static final String QUERY_ID = "queryId";
+  public static final String START_TIME = "startTime";
+  public static final String STATEMENT = "statement";
 
   public static final String COLUMN_ROLE = "role";
   public static final String COLUMN_USER = "user";
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 460ae25..b11f9d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -320,6 +320,9 @@ public class IoTDBDescriptor {
           .getProperty("unseq_file_num_in_each_level",
               Integer.toString(conf.getUnseqFileNumInEachLevel()))));
 
+      conf.setQueryTimeThreshold(Integer.parseInt(properties
+          .getProperty("query_time_threshold", Integer.toString(conf.getQueryTimeThreshold()))));
+
       conf.setSyncEnable(Boolean
           .parseBoolean(properties.getProperty("is_sync_enable",
               Boolean.toString(conf.isSyncEnable()))));
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 6349e21..30d5d52 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -83,7 +83,6 @@ import org.apache.iotdb.db.utils.FilePathUtils;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.db.utils.UpgradeUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.slf4j.Logger;
@@ -394,7 +393,6 @@ public class StorageEngine implements IService {
 
     StorageGroupProcessor storageGroupProcessor = getProcessor(insertRowPlan.getDeviceId());
 
-    // TODO monitor: update statistics
     try {
       storageGroupProcessor.insert(insertRowPlan);
       if (config.isEnableStatMonitor()) {
@@ -430,7 +428,6 @@ public class StorageEngine implements IService {
           + "failed", insertTabletPlan.getDeviceId()), e);
     }
 
-    // TODO monitor: update statistics
     storageGroupProcessor.insertTablet(insertTabletPlan);
     if (config.isEnableStatMonitor()) {
       updateMonitorStatistics(storageGroupProcessor, insertTabletPlan);
@@ -544,14 +541,6 @@ public class StorageEngine implements IService {
     }
   }
 
-  /**
-   * update data.
-   */
-  public void update(String deviceId, String measurementId, long startTime, long endTime,
-      TSDataType type, String v) {
-    // TODO
-  }
-
   public void delete(PartialPath path, long startTime, long endTime, long planIndex)
       throws StorageEngineException {
     try {
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java b/server/src/main/java/org/apache/iotdb/db/exception/QueryIdNotExsitException.java
similarity index 50%
copy from session/src/main/java/org/apache/iotdb/session/Config.java
copy to server/src/main/java/org/apache/iotdb/db/exception/QueryIdNotExsitException.java
index d900df7..0505722 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/QueryIdNotExsitException.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,26 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.session;
+package org.apache.iotdb.db.exception;
 
-public class Config {
+import org.apache.iotdb.rpc.TSStatusCode;
 
-  public static final String DEFAULT_USER = "root";
-  public static final String DEFAULT_PASSWORD = "root";
-  public static final int DEFAULT_FETCH_SIZE = 10000;
-  public static final int DEFAULT_TIMEOUT_MS = 0;
-  public static final boolean DEFAULT_CACHE_LEADER_MODE = true;
+public class QueryIdNotExsitException extends IoTDBException {
 
-  public static final int RETRY_NUM = 3;
-  public static final long RETRY_INTERVAL_MS = 1000;
+  public QueryIdNotExsitException(String message) {
+    super(message, TSStatusCode.QUERY_ID_NOT_EXIST.getStatusCode());
+  }
 
-  /**
-   * thrift init buffer size, 1KB by default
-   */
-  public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
-
-  /**
-   * thrift max frame size (16384000 bytes by default), we change it to 64MB
-   */
-  public static final int DEFAULT_MAX_FRAME_SIZE = 67108864;
 }
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java b/server/src/main/java/org/apache/iotdb/db/exception/query/QueryTimeoutRuntimeException.java
similarity index 50%
copy from session/src/main/java/org/apache/iotdb/session/Config.java
copy to server/src/main/java/org/apache/iotdb/db/exception/query/QueryTimeoutRuntimeException.java
index d900df7..d8ba143 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/query/QueryTimeoutRuntimeException.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,26 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.session;
+package org.apache.iotdb.db.exception.query;
 
-public class Config {
+/**
+ * This class is used to throw run time exception when query is time out.
+ */
+public class QueryTimeoutRuntimeException extends RuntimeException {
 
-  public static final String DEFAULT_USER = "root";
-  public static final String DEFAULT_PASSWORD = "root";
-  public static final int DEFAULT_FETCH_SIZE = 10000;
-  public static final int DEFAULT_TIMEOUT_MS = 0;
-  public static final boolean DEFAULT_CACHE_LEADER_MODE = true;
+  public static final String TIMEOUT_EXCEPTION_MESSAGE
+      = "Current query is time out, please check your statement or modify timeout parameter.";
 
-  public static final int RETRY_NUM = 3;
-  public static final long RETRY_INTERVAL_MS = 1000;
+  public QueryTimeoutRuntimeException(String message, Throwable cause) {
+    super(message, cause);
+  }
 
-  /**
-   * thrift init buffer size, 1KB by default
-   */
-  public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
+  public QueryTimeoutRuntimeException(String message) {
+    super(message);
+  }
 
-  /**
-   * thrift max frame size (16384000 bytes by default), we change it to 64MB
-   */
-  public static final int DEFAULT_MAX_FRAME_SIZE = 67108864;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java b/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
index 87a2adf..f2f4ea9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
@@ -25,7 +25,6 @@ import java.util.List;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
@@ -41,79 +40,81 @@ import org.slf4j.LoggerFactory;
  * PublishHandler handle the messages from MQTT clients.
  */
 public class PublishHandler extends AbstractInterceptHandler {
-    private static final Logger LOG = LoggerFactory.getLogger(PublishHandler.class);
 
-    private IPlanExecutor executor;
-    private PayloadFormatter payloadFormat;
+  private static final Logger LOG = LoggerFactory.getLogger(PublishHandler.class);
 
-    public PublishHandler(IoTDBConfig config) {
-        this.payloadFormat = PayloadFormatManager.getPayloadFormat(config.getMqttPayloadFormatter());
-        try {
-            this.executor = new PlanExecutor();
-        } catch (QueryProcessException e) {
-            throw new RuntimeException(e);
-        }
-    }
+  private IPlanExecutor executor;
+  private PayloadFormatter payloadFormat;
 
-    protected PublishHandler(IPlanExecutor executor, PayloadFormatter payloadFormat) {
-        this.executor = executor;
-        this.payloadFormat = payloadFormat;
+  public PublishHandler(IoTDBConfig config) {
+    this.payloadFormat = PayloadFormatManager.getPayloadFormat(config.getMqttPayloadFormatter());
+    try {
+      this.executor = new PlanExecutor();
+    } catch (QueryProcessException e) {
+      throw new RuntimeException(e);
     }
+  }
 
-    @Override
-    public String getID() {
-        return "iotdb-mqtt-broker-listener";
-    }
+  protected PublishHandler(IPlanExecutor executor, PayloadFormatter payloadFormat) {
+    this.executor = executor;
+    this.payloadFormat = payloadFormat;
+  }
 
-    @Override
-    public void onPublish(InterceptPublishMessage msg) {
-        String clientId = msg.getClientID();
-        ByteBuf payload = msg.getPayload();
-        String topic = msg.getTopicName();
-        String username = msg.getUsername();
-        MqttQoS qos = msg.getQos();
+  @Override
+  public String getID() {
+    return "iotdb-mqtt-broker-listener";
+  }
 
-        LOG.debug("Receive publish message. clientId: {}, username: {}, qos: {}, topic: {}, payload: {}",
-                clientId, username, qos, topic, payload);
+  @Override
+  public void onPublish(InterceptPublishMessage msg) {
+    String clientId = msg.getClientID();
+    ByteBuf payload = msg.getPayload();
+    String topic = msg.getTopicName();
+    String username = msg.getUsername();
+    MqttQoS qos = msg.getQos();
 
-        List<Message> events = payloadFormat.format(payload);
-        if (events == null) {
-            return;
-        }
+    LOG.debug(
+        "Receive publish message. clientId: {}, username: {}, qos: {}, topic: {}, payload: {}",
+        clientId, username, qos, topic, payload);
+
+    List<Message> events = payloadFormat.format(payload);
+    if (events == null) {
+      return;
+    }
 
-        // since device ids from messages maybe different, so we use the InsertPlan not InsertTabletPlan.
-        for (Message event : events) {
-            if (event == null) {
-                continue;
-            }
+    // since device ids from messages maybe different, so we use the InsertPlan not InsertTabletPlan.
+    for (Message event : events) {
+      if (event == null) {
+        continue;
+      }
 
-            InsertRowPlan plan = new InsertRowPlan();
-            plan.setTime(event.getTimestamp());
-            plan.setMeasurements(event.getMeasurements().toArray(new String[0]));
-            plan.setValues(event.getValues().toArray(new Object[0]));
-            plan.setDataTypes(new TSDataType[event.getValues().size()]);
-            plan.setNeedInferType(true);
+      InsertRowPlan plan = new InsertRowPlan();
+      plan.setTime(event.getTimestamp());
+      plan.setMeasurements(event.getMeasurements().toArray(new String[0]));
+      plan.setValues(event.getValues().toArray(new Object[0]));
+      plan.setDataTypes(new TSDataType[event.getValues().size()]);
+      plan.setNeedInferType(true);
 
-            boolean status = false;
-            try {
-                plan.setDeviceId(new PartialPath(event.getDevice()));
-                status = executeNonQuery(plan);
-            } catch (Exception e) {
-                LOG.warn(
-                    "meet error when inserting device {}, measurements {}, at time {}, because ",
-                    event.getDevice(), event.getMeasurements(), event.getTimestamp(), e);
-            }
+      boolean status = false;
+      try {
+        plan.setDeviceId(new PartialPath(event.getDevice()));
+        status = executeNonQuery(plan);
+      } catch (Exception e) {
+        LOG.warn(
+            "meet error when inserting device {}, measurements {}, at time {}, because ",
+            event.getDevice(), event.getMeasurements(), event.getTimestamp(), e);
+      }
 
-            LOG.debug("event process result: {}", status);
-        }
+      LOG.debug("event process result: {}", status);
     }
+  }
 
-    private boolean executeNonQuery(PhysicalPlan plan)
-        throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
-        if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
-            throw new QueryProcessException(
-                    "Current system mode is read-only, does not support non-query operation");
-        }
-        return executor.processNonQuery(plan);
+  private boolean executeNonQuery(PhysicalPlan plan)
+      throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
+    if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
+      throw new QueryProcessException(
+          "Current system mode is read-only, does not support non-query operation");
     }
+    return executor.processNonQuery(plan);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
index d6cd44f..57f3b46 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
@@ -174,6 +174,7 @@ public class Planner {
       case SHOW_MERGE_STATUS:
       case DELETE_PARTITION:
       case CREATE_SCHEMA_SNAPSHOT:
+      case KILL:
       case CREATE_FUNCTION:
       case DROP_FUNCTION:
         return operator;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
index ecbc667..11776ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
@@ -167,6 +167,8 @@ public class SQLConstant {
 
   public static final int TOK_COUNT_DEVICES = 95;
   public static final int TOK_COUNT_STORAGE_GROUP = 96;
+  public static final int TOK_QUERY_PROCESSLIST = 97;
+  public static final int TOK_KILL_QUERY = 98;
 
   public static final Map<Integer, String> tokenSymbol = new HashMap<>();
   public static final Map<Integer, String> tokenNames = new HashMap<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 2a0d7cb..b31d5aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -39,9 +39,11 @@ import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_USER;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_VALUE;
 import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDAF;
 import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDTF;
-import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_NATIVE;
 import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDAF;
 import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDTF;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_NATIVE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.QUERY_ID;
+import static org.apache.iotdb.db.conf.IoTDBConstant.STATEMENT;
 import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
 
 import java.io.File;
@@ -77,6 +79,7 @@ import org.apache.iotdb.db.engine.merge.manage.MergeManager.TaskStatus;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.BatchProcessException;
+import org.apache.iotdb.db.exception.QueryIdNotExsitException;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.UDFRegistrationException;
 import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
@@ -122,6 +125,7 @@ import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.DropFunctionPlan;
 import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
+import org.apache.iotdb.db.qp.physical.sys.KillQueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
 import org.apache.iotdb.db.qp.physical.sys.MergePlan;
 import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan;
@@ -136,6 +140,8 @@ import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.TracingPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryTimeManager;
+import org.apache.iotdb.db.query.control.QueryTimeManager.QueryInfo;
 import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
 import org.apache.iotdb.db.query.dataset.ListDataSet;
 import org.apache.iotdb.db.query.dataset.ShowDevicesDataSet;
@@ -296,6 +302,13 @@ public class PlanExecutor implements IPlanExecutor {
         throw new QueryProcessException("Create index hasn't been supported yet");
       case DROP_INDEX:
         throw new QueryProcessException("Drop index hasn't been supported yet");
+      case KILL:
+        try {
+          operateKillQuery((KillQueryPlan) plan);
+        } catch (QueryIdNotExsitException e) {
+          throw new QueryProcessException(e.getMessage());
+        }
+        return true;
       default:
         throw new UnsupportedOperationException(
             String.format("operation %s is not supported", plan.getOperatorType()));
@@ -332,6 +345,32 @@ public class PlanExecutor implements IPlanExecutor {
     IoTDB.metaManager.createMTreeSnapshot();
   }
 
+  private void operateKillQuery(KillQueryPlan killQueryPlan) throws QueryIdNotExsitException {
+    QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
+    long killQueryId = killQueryPlan.getQueryId();
+    if (killQueryId != -1) {
+      if (queryTimeManager.getQueryInfoMap().get(killQueryId) != null) {
+        queryTimeManager.getQueryInfoMap().computeIfPresent(killQueryId, (k, v) -> {
+          queryTimeManager.killQuery(k);
+          return null;
+        });
+      } else {
+        throw new QueryIdNotExsitException(String
+            .format("Query Id %d is not exist, please check it.", killQueryPlan.getQueryId()));
+      }
+    } else {
+      // if queryId is not specified, kill all running queries
+      if (!queryTimeManager.getQueryInfoMap().isEmpty()) {
+        synchronized (queryTimeManager.getQueryInfoMap()) {
+          List<Long> queryIdList = new ArrayList<>(queryTimeManager.getQueryInfoMap().keySet());
+          for (Long queryId : queryIdList) {
+            queryTimeManager.killQuery(queryId);
+          }
+        }
+      }
+    }
+  }
+
   private void operateTracing(TracingPlan plan) {
     IoTDBDescriptor.getInstance().getConfig().setEnablePerformanceTracing(plan.isTracingOn());
   }
@@ -455,6 +494,8 @@ public class PlanExecutor implements IPlanExecutor {
         return processCountNodes((CountPlan) showPlan);
       case MERGE_STATUS:
         return processShowMergeStatus();
+      case QUERY_PROCESSLIST:
+        return processShowQueryProcesslist();
       case FUNCTIONS:
         return processShowFunctions((ShowFunctionsPlan) showPlan);
       default:
@@ -1558,6 +1599,21 @@ public class PlanExecutor implements IPlanExecutor {
     return record;
   }
 
+  private QueryDataSet processShowQueryProcesslist() {
+    ListDataSet listDataSet = new ListDataSet(Arrays
+        .asList(new PartialPath(QUERY_ID, false), new PartialPath(STATEMENT, false)),
+        Arrays.asList(TSDataType.INT64, TSDataType.TEXT));
+    QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
+    for (Entry<Long, QueryInfo> queryInfo : queryTimeManager.getQueryInfoMap()
+        .entrySet()) {
+      RowRecord record = new RowRecord(queryInfo.getValue().getStartTime());
+      record.addField(queryInfo.getKey(), TSDataType.INT64);
+      record.addField(new Binary(queryInfo.getValue().getStatement()), TSDataType.TEXT);
+      listDataSet.putRecord(record);
+    }
+    return listDataSet;
+  }
+
   /**
    * @param storageGroups the storage groups to check
    * @return List<PartialPath> the storage groups that not exist
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index fad59aa..6c2e49d 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -78,7 +78,7 @@ public abstract class Operator {
     ALTER_TIMESERIES, FLUSH, MERGE, FULL_MERGE, CLEAR_CACHE,
     SHOW_MERGE_STATUS, CREATE_SCHEMA_SNAPSHOT, TRACING, DELETE_PARTITION,
     UDAF, UDTF, CREATE_FUNCTION, DROP_FUNCTION,
-    CREATE_MULTI_TIMESERIES, CREATE_INDEX, DROP_INDEX, QUERY_INDEX,
+    CREATE_MULTI_TIMESERIES, CREATE_INDEX, DROP_INDEX, QUERY_INDEX, KILL,
     CHANGE_TAG_OFFSET, CHANGE_ALIAS, MNODE,
     MEASUREMENT_MNODE, STORAGE_GROUP_MNODE,
     BATCH_INSERT_ONE_DEVICE;
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/KillQueryOperator.java
similarity index 50%
copy from session/src/main/java/org/apache/iotdb/session/Config.java
copy to server/src/main/java/org/apache/iotdb/db/qp/logical/sys/KillQueryOperator.java
index d900df7..7634566 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/KillQueryOperator.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,26 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.session;
+package org.apache.iotdb.db.qp.logical.sys;
 
-public class Config {
+import org.apache.iotdb.db.qp.logical.RootOperator;
 
-  public static final String DEFAULT_USER = "root";
-  public static final String DEFAULT_PASSWORD = "root";
-  public static final int DEFAULT_FETCH_SIZE = 10000;
-  public static final int DEFAULT_TIMEOUT_MS = 0;
-  public static final boolean DEFAULT_CACHE_LEADER_MODE = true;
+public class KillQueryOperator extends RootOperator {
+  long queryId = -1;
 
-  public static final int RETRY_NUM = 3;
-  public static final long RETRY_INTERVAL_MS = 1000;
+  public KillQueryOperator(int tokenIntType) {
+    this(tokenIntType, OperatorType.KILL);
+  }
 
-  /**
-   * thrift init buffer size, 1KB by default
-   */
-  public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
+  public KillQueryOperator(int tokenIntType, OperatorType operatorType) {
+    super(tokenIntType);
+    this.operatorType = operatorType;
+  }
 
-  /**
-   * thrift max frame size (16384000 bytes by default), we change it to 64MB
-   */
-  public static final int DEFAULT_MAX_FRAME_SIZE = 67108864;
+  public void setQueryId(long queryId) {
+    this.queryId = queryId;
+  }
+
+  public long getQueryId() {
+    return queryId;
+  }
 }
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/KillQueryPlan.java
similarity index 50%
copy from session/src/main/java/org/apache/iotdb/session/Config.java
copy to server/src/main/java/org/apache/iotdb/db/qp/physical/sys/KillQueryPlan.java
index d900df7..f6fa6ad 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/KillQueryPlan.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,26 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.session;
+package org.apache.iotdb.db.qp.physical.sys;
 
-public class Config {
+import java.util.Collections;
+import java.util.List;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 
-  public static final String DEFAULT_USER = "root";
-  public static final String DEFAULT_PASSWORD = "root";
-  public static final int DEFAULT_FETCH_SIZE = 10000;
-  public static final int DEFAULT_TIMEOUT_MS = 0;
-  public static final boolean DEFAULT_CACHE_LEADER_MODE = true;
+public class KillQueryPlan extends PhysicalPlan {
 
-  public static final int RETRY_NUM = 3;
-  public static final long RETRY_INTERVAL_MS = 1000;
+  private long queryId = -1;
 
-  /**
-   * thrift init buffer size, 1KB by default
-   */
-  public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
+  public KillQueryPlan(long queryId) {
+    super(false, OperatorType.KILL);
+    this.queryId = queryId;
+  }
 
-  /**
-   * thrift max frame size (16384000 bytes by default), we change it to 64MB
-   */
-  public static final int DEFAULT_MAX_FRAME_SIZE = 67108864;
+  @Override
+  public List<PartialPath> getPaths() {
+    return Collections.emptyList();
+  }
+
+  public long getQueryId() {
+    return queryId;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
index 7a5188a..e610877 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
@@ -101,6 +101,6 @@ public class ShowPlan extends PhysicalPlan {
   public enum ShowContentType {
     FLUSH_TASK_INFO, TTL, VERSION, TIMESERIES, STORAGE_GROUP, CHILD_PATH, DEVICES,
     COUNT_TIMESERIES, COUNT_NODE_TIMESERIES, COUNT_NODES, MERGE_STATUS, FUNCTIONS, COUNT_DEVICES,
-    COUNT_STORAGE_GROUP
+    COUNT_STORAGE_GROUP, QUERY_PROCESSLIST
   }
 }
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowQueryProcesslistPlan.java
similarity index 50%
copy from session/src/main/java/org/apache/iotdb/session/Config.java
copy to server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowQueryProcesslistPlan.java
index d900df7..124445d 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowQueryProcesslistPlan.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,26 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.session;
+package org.apache.iotdb.db.qp.physical.sys;
 
-public class Config {
+public class ShowQueryProcesslistPlan extends ShowPlan {
 
-  public static final String DEFAULT_USER = "root";
-  public static final String DEFAULT_PASSWORD = "root";
-  public static final int DEFAULT_FETCH_SIZE = 10000;
-  public static final int DEFAULT_TIMEOUT_MS = 0;
-  public static final boolean DEFAULT_CACHE_LEADER_MODE = true;
+  public ShowQueryProcesslistPlan(ShowContentType showContentType) {
+    super(showContentType);
+  }
 
-  public static final int RETRY_NUM = 3;
-  public static final long RETRY_INTERVAL_MS = 1000;
-
-  /**
-   * thrift init buffer size, 1KB by default
-   */
-  public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
-
-  /**
-   * thrift max frame size (16384000 bytes by default), we change it to 64MB
-   */
-  public static final int DEFAULT_MAX_FRAME_SIZE = 67108864;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
index d6c5db2..2910c61 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
@@ -22,6 +22,8 @@ import static org.apache.iotdb.db.index.common.IndexConstant.PATTERN;
 import static org.apache.iotdb.db.index.common.IndexConstant.THRESHOLD;
 import static org.apache.iotdb.db.index.common.IndexConstant.TOP_K;
 import static org.apache.iotdb.db.qp.constant.SQLConstant.TIME_PATH;
+import static org.apache.iotdb.db.qp.constant.SQLConstant.TOK_KILL_QUERY;
+import static org.apache.iotdb.db.qp.constant.SQLConstant.TOK_QUERY;
 
 import java.io.File;
 import java.time.ZoneId;
@@ -68,6 +70,7 @@ import org.apache.iotdb.db.qp.logical.sys.DeleteTimeSeriesOperator;
 import org.apache.iotdb.db.qp.logical.sys.DropFunctionOperator;
 import org.apache.iotdb.db.qp.logical.sys.DropIndexOperator;
 import org.apache.iotdb.db.qp.logical.sys.FlushOperator;
+import org.apache.iotdb.db.qp.logical.sys.KillQueryOperator;
 import org.apache.iotdb.db.qp.logical.sys.LoadConfigurationOperator;
 import org.apache.iotdb.db.qp.logical.sys.LoadConfigurationOperator.LoadConfigurationOperatorType;
 import org.apache.iotdb.db.qp.logical.sys.LoadDataOperator;
@@ -145,6 +148,7 @@ import org.apache.iotdb.db.qp.sql.SqlBaseParser.IndexWithClauseContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.InsertColumnsSpecContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.InsertStatementContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.InsertValuesSpecContext;
+import org.apache.iotdb.db.qp.sql.SqlBaseParser.KillQueryContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.LastClauseContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.LastElementContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.LimitClauseContext;
@@ -189,6 +193,7 @@ import org.apache.iotdb.db.qp.sql.SqlBaseParser.ShowDevicesContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.ShowFlushTaskInfoContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.ShowFunctionsContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.ShowMergeStatusContext;
+import org.apache.iotdb.db.qp.sql.SqlBaseParser.ShowQueryProcesslistContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.ShowStorageGroupContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.ShowTTLStatementContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.ShowTimeseriesContext;
@@ -737,6 +742,20 @@ public class IoTDBSqlVisitor extends SqlBaseBaseVisitor<Operator> {
   }
 
   @Override
+  public Operator visitShowQueryProcesslist(ShowQueryProcesslistContext ctx) {
+    return new ShowOperator(SQLConstant.TOK_QUERY_PROCESSLIST);
+  }
+
+  @Override
+  public Operator visitKillQuery(KillQueryContext ctx) {
+    KillQueryOperator killQueryOperator = new KillQueryOperator(TOK_KILL_QUERY);
+    if (ctx.INT() != null) {
+      killQueryOperator.setQueryId(Integer.parseInt(ctx.INT().getText()));
+    }
+    return killQueryOperator;
+  }
+
+  @Override
   public Operator visitShowStorageGroup(ShowStorageGroupContext ctx) {
     if (ctx.prefixPath() != null) {
       return new ShowStorageGroupOperator(SQLConstant.TOK_STORAGE_GROUP,
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index c906dfe..1f85a58 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -28,9 +28,9 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.query.PathNumOverLimitException;
 import org.apache.iotdb.db.exception.query.LogicalOperatorException;
 import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
+import org.apache.iotdb.db.exception.query.PathNumOverLimitException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.runtime.SQLParserException;
 import org.apache.iotdb.db.metadata.PartialPath;
@@ -55,6 +55,7 @@ import org.apache.iotdb.db.qp.logical.sys.DeleteTimeSeriesOperator;
 import org.apache.iotdb.db.qp.logical.sys.DropFunctionOperator;
 import org.apache.iotdb.db.qp.logical.sys.DropIndexOperator;
 import org.apache.iotdb.db.qp.logical.sys.FlushOperator;
+import org.apache.iotdb.db.qp.logical.sys.KillQueryOperator;
 import org.apache.iotdb.db.qp.logical.sys.LoadConfigurationOperator;
 import org.apache.iotdb.db.qp.logical.sys.LoadConfigurationOperator.LoadConfigurationOperatorType;
 import org.apache.iotdb.db.qp.logical.sys.LoadDataOperator;
@@ -99,6 +100,7 @@ import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.DropFunctionPlan;
 import org.apache.iotdb.db.qp.physical.sys.DropIndexPlan;
 import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
+import org.apache.iotdb.db.qp.physical.sys.KillQueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
 import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan.LoadConfigurationPlanType;
 import org.apache.iotdb.db.qp.physical.sys.LoadDataPlan;
@@ -112,11 +114,12 @@ import org.apache.iotdb.db.qp.physical.sys.ShowFunctionsPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowMergeStatusPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType;
+import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
-import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.qp.physical.sys.TracingPlan;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.udf.core.context.UDFContext;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.SchemaUtils;
@@ -279,6 +282,8 @@ public class PhysicalGenerator {
           case SQLConstant.TOK_CHILD_PATHS:
             return new ShowChildPathsPlan(
                 ShowContentType.CHILD_PATH, ((ShowChildPathsOperator) operator).getPath());
+          case SQLConstant.TOK_QUERY_PROCESSLIST:
+            return new ShowQueryProcesslistPlan(ShowContentType.QUERY_PROCESSLIST);
           case SQLConstant.TOK_SHOW_FUNCTIONS:
             return new ShowFunctionsPlan(((ShowFunctionsOperator) operator).showTemporary());
           default:
@@ -305,6 +310,8 @@ public class PhysicalGenerator {
         return new DeletePartitionPlan(op.getStorageGroupName(), op.getPartitionId());
       case CREATE_SCHEMA_SNAPSHOT:
         return new CreateSnapshotPlan();
+      case KILL:
+        return new KillQueryPlan(((KillQueryOperator) operator).getQueryId());
       case CREATE_FUNCTION:
         CreateFunctionOperator createFunctionOperator = (CreateFunctionOperator) operator;
         return new CreateFunctionPlan(createFunctionOperator.isTemporary(),
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index e1262ca..3344192 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -208,6 +208,9 @@ public class QueryResourceManager {
 
     // close and delete UDF temp files
     TemporaryQueryDataFileService.getInstance().deregister(queryId);
+
+    // remove query info in QueryTimeManager
+    QueryTimeManager.getInstance().unRegisterQuery(queryId);
   }
 
   private static class QueryTokenManagerHelper {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryTimeManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryTimeManager.java
new file mode 100644
index 0000000..72eed3e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryTimeManager.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.control;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.ServiceType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is used to monitor the executing time of each query.
+ * </p>
+ * Once one is over the threshold, it will be killed and return the time out exception.
+ */
+public class QueryTimeManager implements IService {
+
+  private static final Logger logger = LoggerFactory.getLogger(QueryTimeManager.class);
+
+  /**
+   * the key of queryInfoMap is the query id and the value of queryInfoMap is the start
+   * time, the statement and executing thread of this query.
+   */
+  private Map<Long, QueryInfo> queryInfoMap;
+
+  private ScheduledExecutorService executorService;
+
+  private Map<Long, ScheduledFuture<?>> queryScheduledTaskMap;
+
+  private QueryTimeManager() {
+    queryInfoMap = new ConcurrentHashMap<>();
+    queryScheduledTaskMap = new ConcurrentHashMap<>();
+    executorService = IoTDBThreadPoolFactory.newScheduledThreadPool(1,
+        "query-time-manager");
+  }
+
+  public void registerQuery(long queryId, long startTime, String sql, long timeout,
+      Thread queryThread) {
+    queryInfoMap.put(queryId, new QueryInfo(startTime, sql, queryThread));
+    // submit a scheduled task to judge whether query is still running after timeout
+    ScheduledFuture<?> scheduledFuture = executorService.schedule(() -> {
+      queryInfoMap.computeIfPresent(queryId, (k, v) -> {
+        killQuery(k);
+        logger.warn(String.format("Query is time out with queryId %d", queryId));
+        return null;
+      });
+    }, timeout, TimeUnit.MILLISECONDS);
+    queryScheduledTaskMap.put(queryId, scheduledFuture);
+  }
+
+  public void killQuery(long queryId) {
+    if (queryInfoMap.get(queryId) == null) {
+      return;
+    }
+    queryInfoMap.get(queryId).getThread().interrupt();
+    unRegisterQuery(queryId);
+  }
+
+  public void unRegisterQuery(long queryId) {
+    if (Thread.interrupted()) {
+      throw new QueryTimeoutRuntimeException(
+          QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
+    }
+    if (queryInfoMap.get(queryId) == null) {
+      return;
+    }
+    queryInfoMap.remove(queryId);
+    queryScheduledTaskMap.get(queryId).cancel(false);
+    queryScheduledTaskMap.remove(queryId);
+  }
+
+  public boolean isQueryInterrupted(long queryId) {
+    return queryInfoMap.get(queryId).getThread().isInterrupted();
+  }
+
+  public Map<Long, QueryInfo> getQueryInfoMap() {
+    return queryInfoMap;
+  }
+
+  public static QueryTimeManager getInstance() {
+    return QueryTimeManagerHelper.INSTANCE;
+  }
+
+  @Override
+  public void start() {
+    // Do Nothing
+  }
+
+  @Override
+  public void stop() {
+    if (executorService == null || executorService.isShutdown()) {
+      return;
+    }
+    executorService.shutdownNow();
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.QUERY_TIME_MANAGER;
+  }
+
+  private static class QueryTimeManagerHelper {
+
+    private static final QueryTimeManager INSTANCE = new QueryTimeManager();
+
+    private QueryTimeManagerHelper() {
+    }
+  }
+
+  public class QueryInfo {
+
+    /**
+     *  To reduce the cost of memory, we only keep the a certain size statement.
+     *  For statement whose length is over this, we keep its head and tail.
+     */
+    private static final int MAX_STATEMENT_LENGTH = 64;
+
+    private final long startTime;
+    private final String statement;
+    /**
+     *  Only main thread is put in this structure since the sub threads are maintained
+     *  by the thread pool. The thread allocated for readTask will change every time,
+     *  so we have to access this map frequently, which will lead to big performance cost.
+     */
+    private final Thread thread;
+
+    public QueryInfo(long startTime, String statement, Thread thread) {
+      this.startTime = startTime;
+      this.thread = thread;
+      if (statement.length() <= 64) {
+        this.statement = statement;
+      } else {
+        this.statement = statement.substring(0, MAX_STATEMENT_LENGTH / 2) + "..." + statement
+            .substring(statement.length() - MAX_STATEMENT_LENGTH / 2);
+      }
+    }
+
+    public long getStartTime() {
+      return startTime;
+    }
+
+    public String getStatement() {
+      return statement;
+    }
+
+    public Thread getThread() {
+      return thread;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
index c286fa3..16e58de 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
@@ -28,6 +28,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicIntegerArray;
 import org.apache.iotdb.db.concurrent.WrappedRunnable;
+import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
 import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
@@ -68,6 +69,10 @@ public class NonAlignEngineDataSet extends QueryDataSet implements DirectNonAlig
       PublicBAOS timeBAOS = new PublicBAOS();
       PublicBAOS valueBAOS = new PublicBAOS();
       try {
+        if (interrupted) {
+          // nothing is put here since before reading it the main thread will return
+          return;
+        }
         synchronized (reader) {
           // if the task is submitted, there must be free space in the queue
           // so here we don't need to check whether the queue has free space
@@ -189,6 +194,7 @@ public class NonAlignEngineDataSet extends QueryDataSet implements DirectNonAlig
         }
       } catch (InterruptedException e) {
         LOGGER.error("Interrupted while putting into the blocking queue: ", e);
+        interrupted = true;
         Thread.currentThread().interrupt();
       } catch (IOException e) {
         LOGGER.error("Something gets wrong while reading from the series reader: ", e);
@@ -226,6 +232,11 @@ public class NonAlignEngineDataSet extends QueryDataSet implements DirectNonAlig
   // capacity for blocking queue
   private static final int BLOCKING_QUEUE_CAPACITY = 5;
 
+  /**
+   * flag that main thread is interrupted or not
+   */
+  private volatile boolean interrupted = false;
+
   private static final QueryTaskPoolManager pool = QueryTaskPoolManager.getInstance();
 
   private static final Logger LOGGER = LoggerFactory.getLogger(NonAlignEngineDataSet.class);
@@ -258,6 +269,11 @@ public class NonAlignEngineDataSet extends QueryDataSet implements DirectNonAlig
   }
 
   private void init(WatermarkEncoder encoder, int fetchSize) {
+    if (Thread.interrupted()) {
+      interrupted = true;
+      throw new QueryTimeoutRuntimeException(
+          QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
+    }
     initLimit(super.rowOffset, super.rowLimit, seriesReaderWithoutValueFilterList.size());
     this.fetchSize = fetchSize;
     for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) {
@@ -286,6 +302,12 @@ public class NonAlignEngineDataSet extends QueryDataSet implements DirectNonAlig
 
     for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
       if (!noMoreDataInQueueArray[seriesIndex]) {
+        // check the interrupted status of main thread before take next batch
+        if (Thread.interrupted()) {
+          interrupted = true;
+          throw new QueryTimeoutRuntimeException(
+              QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
+        }
         Pair<ByteBuffer, ByteBuffer> timeValueByteBufferPair = blockingQueueArray[seriesIndex]
             .take();
         if (timeValueByteBufferPair.left == null || timeValueByteBufferPair.right == null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index 0dccfe2..c1ebc7e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -28,6 +28,7 @@ import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.iotdb.db.concurrent.WrappedRunnable;
+import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
 import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
@@ -49,14 +50,14 @@ import org.slf4j.LoggerFactory;
 public class RawQueryDataSetWithoutValueFilter extends QueryDataSet implements
     DirectAlignByTimeDataSet {
 
-  private static class ReadTask extends WrappedRunnable {
+  private class ReadTask extends WrappedRunnable {
 
     private final ManagedSeriesReader reader;
     private final String pathName;
     private BlockingQueue<BatchData> blockingQueue;
 
-    public ReadTask(ManagedSeriesReader reader,
-        BlockingQueue<BatchData> blockingQueue, String pathName) {
+    public ReadTask(ManagedSeriesReader reader, BlockingQueue<BatchData> blockingQueue,
+        String pathName) {
       this.reader = reader;
       this.blockingQueue = blockingQueue;
       this.pathName = pathName;
@@ -65,6 +66,10 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet implements
     @Override
     public void runMayThrow() {
       try {
+        // check the status of mainThread before next reading
+        if (interrupted) {
+          return;
+        }
         synchronized (reader) {
           // if the task is submitted, there must be free space in the queue
           // so here we don't need to check whether the queue has free space
@@ -141,6 +146,11 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet implements
   // capacity for blocking queue
   private static final int BLOCKING_QUEUE_CAPACITY = 5;
 
+  /**
+   * flag that main thread is interrupted or not
+   */
+  private volatile boolean interrupted = false;
+
   private static final QueryTaskPoolManager TASK_POOL_MANAGER = QueryTaskPoolManager.getInstance();
 
   private static final Logger LOGGER = LoggerFactory
@@ -178,6 +188,12 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet implements
           .submit(new ReadTask(reader, blockingQueueArray[i], paths.get(i).getFullPath()));
     }
     for (int i = 0; i < seriesReaderList.size(); i++) {
+      // check the interrupted status of main thread before taking next batch
+      if (Thread.interrupted()) {
+        interrupted = true;
+        throw new QueryTimeoutRuntimeException(
+            QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
+      }
       fillCache(i);
       // try to put the next timestamp into the heap
       if (cachedBatchDataArray[i] != null && cachedBatchDataArray[i].hasCurrent()) {
@@ -283,7 +299,13 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet implements
           // move next
           cachedBatchDataArray[seriesIndex].next();
 
-          // get next batch if current batch is empty and  still have remaining batch data in queue
+          // check the interrupted status of main thread before taking next batch
+          if (Thread.interrupted()) {
+            interrupted = true;
+            throw new QueryTimeoutRuntimeException(
+                QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
+          }
+          // get next batch if current batch is empty and still have remaining batch data in queue
           if (!cachedBatchDataArray[seriesIndex].hasCurrent()
               && !noMoreDataInQueueArray[seriesIndex]) {
             fillCache(seriesIndex);
@@ -430,6 +452,12 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet implements
         // move next
         cachedBatchDataArray[seriesIndex].next();
 
+        // check the interrupted status of main thread before taking next batch
+        if (Thread.interrupted()) {
+          interrupted = true;
+          throw new QueryTimeoutRuntimeException(
+              QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
+        }
         // get next batch if current batch is empty and still have remaining batch data in queue
         if (!cachedBatchDataArray[seriesIndex].hasCurrent()
             && !noMoreDataInQueueArray[seriesIndex]) {
@@ -437,7 +465,10 @@ public class RawQueryDataSetWithoutValueFilter extends QueryDataSet implements
             fillCache(seriesIndex);
           } catch (InterruptedException e) {
             LOGGER.error("Interrupted while taking from the blocking queue: ", e);
+            interrupted = true;
             Thread.currentThread().interrupt();
+            throw new QueryTimeoutRuntimeException(
+                QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
           } catch (IOException e) {
             LOGGER.error("Got IOException", e);
             throw e;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
index a4cb4f9..ea12e37 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/RawDataQueryExecutor.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
@@ -70,7 +71,8 @@ public class RawDataQueryExecutor {
           queryPlan.getDeduplicatedDataTypes(), readersOfSelectedSeries, queryPlan.isAscending());
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
-      throw new StorageEngineException(e.getMessage());
+      throw new QueryTimeoutRuntimeException(
+          QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
     } catch (IOException e) {
       throw new StorageEngineException(e.getMessage());
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index b433bae..0d15e76 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
@@ -173,6 +174,10 @@ public class SeriesReader {
   }
 
   boolean hasNextFile() throws IOException {
+    if (Thread.interrupted()) {
+      throw new QueryTimeoutRuntimeException(
+          QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
+    }
 
     if (!unSeqPageReaders.isEmpty()
         || firstPageReader != null
@@ -232,6 +237,11 @@ public class SeriesReader {
    * overlapped chunks are consumed
    */
   boolean hasNextChunk() throws IOException {
+    if (Thread.interrupted()) {
+      throw new QueryTimeoutRuntimeException(
+          QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
+    }
+
     if (!unSeqPageReaders.isEmpty()
         || firstPageReader != null
         || mergeReader.hasNextTimeValuePair()) {
@@ -351,6 +361,10 @@ public class SeriesReader {
   @SuppressWarnings("squid:S3776")
   // Suppress high Cognitive Complexity warning
   boolean hasNextPage() throws IOException {
+    if (Thread.interrupted()) {
+      throw new QueryTimeoutRuntimeException(
+          QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
+    }
 
     /*
      * has overlapped data before
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index 835201a..7757dd1 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -45,6 +45,7 @@ public enum ServiceType {
   TEMPORARY_QUERY_DATA_FILE_SERVICE("Temporary Query Data File Service", ""),
   CACHE_HIT_RATIO_DISPLAY_SERVICE("CACHE_HIT_RATIO_DISPLAY_SERVICE",
       generateJmxName(IoTDBConstant.IOTDB_PACKAGE, "Cache Hit Ratio")),
+  QUERY_TIME_MANAGER("Query time manager", "Query time"),
 
   FLUSH_SERVICE("Flush ServerService",
       generateJmxName("org.apache.iotdb.db.engine.pool", "Flush Manager")),
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 0ad6020..261d0f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -59,6 +59,7 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
 import org.apache.iotdb.db.exception.runtime.SQLParserException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.metrics.server.SqlArgument;
@@ -88,9 +89,11 @@ import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.control.QueryTimeManager;
 import org.apache.iotdb.db.query.control.TracingManager;
 import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
 import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
@@ -200,6 +203,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
   private static final AtomicInteger queryCount = new AtomicInteger(0);
 
+  private QueryTimeManager queryTimeManager = QueryTimeManager.getInstance();
+
   public TSServiceImpl() throws QueryProcessException {
     processor = new Planner();
     executor = new PlanExecutor();
@@ -470,7 +475,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
       return physicalPlan.isQuery()
           ? internalExecuteQueryStatement(statement, req.statementId, physicalPlan,
-          req.fetchSize, sessionIdUsernameMap.get(req.getSessionId()))
+          req.fetchSize, req.timeout, sessionIdUsernameMap.get(req.getSessionId()))
           : executeUpdateStatement(physicalPlan, req.getSessionId());
     } catch (Exception e) {
       return RpcUtils.getTSExecuteStatementResp(onQueryException(e, "executing executeStatement"));
@@ -491,7 +496,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
       return physicalPlan.isQuery()
           ? internalExecuteQueryStatement(statement, req.statementId, physicalPlan, req.fetchSize,
-          sessionIdUsernameMap.get(req.getSessionId()))
+          req.timeout, sessionIdUsernameMap.get(req.getSessionId()))
           : RpcUtils.getTSExecuteStatementResp(
               TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
     } catch (Exception e) {
@@ -511,7 +516,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
           .rawDataQueryReqToPhysicalPlan(req, sessionIdZoneIdMap.get(req.getSessionId()));
       return physicalPlan.isQuery()
           ? internalExecuteQueryStatement("", req.statementId, physicalPlan, req.fetchSize,
-          sessionIdUsernameMap.get(req.getSessionId()))
+          config.getQueryTimeThreshold(), sessionIdUsernameMap.get(req.getSessionId()))
           : RpcUtils.getTSExecuteStatementResp(TSStatusCode.EXECUTE_STATEMENT_ERROR,
               "Statement is not a query statement.");
     } catch (Exception e) {
@@ -525,8 +530,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
    *             some AuthorPlan
    */
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
-  private TSExecuteStatementResp internalExecuteQueryStatement(String statement,
-      long statementId, PhysicalPlan plan, int fetchSize, String username)
+  private TSExecuteStatementResp internalExecuteQueryStatement(String statement, long statementId,
+      PhysicalPlan plan, int fetchSize, long timeout, String username)
       throws QueryProcessException, SQLException, StorageEngineException, QueryFilterOptimizationException, MetadataException, IOException, InterruptedException, TException, AuthException {
     queryCount.incrementAndGet();
     AUDIT_LOGGER.debug("Session {} execute Query: {}", currSessionId.get(), statement);
@@ -579,12 +584,17 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
       // generate the queryId for the operation
       queryId = generateQueryId(true, fetchSize, deduplicatedPathNum);
+      // register query info to queryTimeManager
+      if (!(plan instanceof ShowQueryProcesslistPlan)) {
+        queryTimeManager
+            .registerQuery(queryId, startTime, statement, timeout, Thread.currentThread());
+      }
       if (plan instanceof QueryPlan && config.isEnablePerformanceTracing()) {
+        TracingManager tracingManager = TracingManager.getInstance();
         if (!(plan instanceof AlignByDevicePlan)) {
-          TracingManager.getInstance()
-              .writeQueryInfo(queryId, statement, startTime, plan.getPaths().size());
+          tracingManager.writeQueryInfo(queryId, statement, startTime, plan.getPaths().size());
         } else {
-          TracingManager.getInstance().writeQueryInfo(queryId, statement, startTime);
+          tracingManager.writeQueryInfo(queryId, statement, startTime);
         }
       }
 
@@ -611,7 +621,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       resp.setOperationType(plan.getOperatorType().toString());
       if (plan.getOperatorType() == OperatorType.AGGREGATION) {
         resp.setIgnoreTimeStamp(true);
-      } // else default ignoreTimeStamp is false
+      } else if (plan instanceof ShowQueryProcesslistPlan) {
+        resp.setIgnoreTimeStamp(false);
+      }
 
       if (newDataSet instanceof DirectNonAlignDataSet) {
         resp.setNonAlignQueryDataSet(fillRpcNonAlignReturnData(fetchSize, newDataSet, username));
@@ -636,6 +648,10 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         }
       }
 
+      // remove query info in QueryTimeManager
+      if (!(plan instanceof ShowQueryProcesslistPlan)) {
+        queryTimeManager.unRegisterQuery(queryId);
+      }
       return resp;
     } catch (Exception e) {
       releaseQueryResourceNoExceptions(queryId);
@@ -837,6 +853,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
             RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "Has not executed query"));
       }
 
+      // register query info to queryTimeManager
+      queryTimeManager
+          .registerQuery(req.queryId, System.currentTimeMillis(), req.statement, req.timeout,
+              Thread.currentThread());
+
       QueryDataSet queryDataSet = queryId2DataSet.get(req.queryId);
       if (req.isAlign) {
         TSQueryDataSet result =
@@ -849,6 +870,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         resp.setHasResultSet(hasResultSet);
         resp.setQueryDataSet(result);
         resp.setIsAlign(true);
+
+        queryTimeManager.unRegisterQuery(req.queryId);
         return resp;
       } else {
         TSQueryNonAlignDataSet nonAlignResult =
@@ -868,6 +891,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         resp.setHasResultSet(hasResultSet);
         resp.setNonAlignQueryDataSet(nonAlignResult);
         resp.setIsAlign(false);
+
+        queryTimeManager.unRegisterQuery(req.queryId);
         return resp;
       }
     } catch (Exception e) {
@@ -888,9 +913,15 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
 
   private TSQueryNonAlignDataSet fillRpcNonAlignReturnData(
       int fetchSize, QueryDataSet queryDataSet, String userName)
-      throws TException, AuthException, InterruptedException, IOException, QueryProcessException {
+      throws TException, AuthException, IOException, QueryProcessException {
     WatermarkEncoder encoder = getWatermarkEncoder(userName);
-    return ((DirectNonAlignDataSet) queryDataSet).fillBuffer(fetchSize, encoder);
+    try {
+      return ((DirectNonAlignDataSet) queryDataSet).fillBuffer(fetchSize, encoder);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new QueryTimeoutRuntimeException(
+          QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
+    }
   }
 
   private WatermarkEncoder getWatermarkEncoder(String userName) throws TException, AuthException {
@@ -1573,7 +1604,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   }
 
   private TSStatus tryCatchQueryException(Exception e) {
-    if (e instanceof ParseCancellationException) {
+    if (e instanceof QueryTimeoutRuntimeException) {
+      DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(e.getMessage(), e);
+      // just recover the state of thread here
+      if (Thread.interrupted()) {
+        DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn("Recover the state of the thread interrupted");
+      }
+      return RpcUtils.getStatus(TSStatusCode.TIME_OUT, getRootCause(e));
+    } else if (e instanceof ParseCancellationException) {
       DETAILED_FAILURE_QUERY_TRACE_LOGGER.warn(INFO_PARSING_SQL_ERROR, e);
       return RpcUtils
           .getStatus(TSStatusCode.SQL_PARSE_ERROR, INFO_PARSING_SQL_ERROR + getRootCause(e));
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByIT.java
index 8be642c..c588f94 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IOTDBGroupByIT.java
@@ -41,7 +41,6 @@ import static org.junit.Assert.fail;
 
 public class IOTDBGroupByIT {
 
-
   private static String[] dataSet1 = new String[]{
       "SET STORAGE GROUP TO root.ln.wf01.wt01",
       "CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBKillQueryTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBKillQueryTest.java
new file mode 100644
index 0000000..86cb6ac
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBKillQueryTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.jdbc.IoTDBSQLException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IoTDBKillQueryTest {
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.envSetUp();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  /**
+   * Test killing query with specified query id which is not exist.
+   */
+  @Test
+  public void killQueryTest1() {
+    try (Connection connection = DriverManager.
+        getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      statement.execute("kill query 998");
+      fail("QueryIdNotExistException is not thrown");
+    } catch (IoTDBSQLException e) {
+      Assert.assertTrue(e.getMessage().contains("Query Id 998 is not exist"));
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  /**
+   * Test killing query without explicit query id. It's supposed to run successfully.
+   */
+  @Test
+  public void killQueryTest2() {
+    try (Connection connection = DriverManager.
+        getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet = statement.execute("kill query");
+      Assert.assertEquals(false, hasResultSet);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryTimeoutTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryTimeoutTest.java
new file mode 100644
index 0000000..6a2bb99
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryTimeoutTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.Statement;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.jdbc.IoTDBSQLException;
+import org.apache.iotdb.jdbc.IoTDBStatement;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class IoTDBQueryTimeoutTest {
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.envSetUp();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    prepareData();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  /**
+   * Test show query processlist, there is supposed to no result.
+   */
+  @Test
+  public void queryProcessListTest() {
+    String headerResult = "Time, queryId, statement, ";
+
+    try (Connection connection = DriverManager.
+        getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet = statement.execute("show query processlist");
+      Assert.assertTrue(hasResultSet);
+      ResultSet resultSet = statement.getResultSet();
+      ResultSetMetaData metaData = resultSet.getMetaData();
+      Assert.assertEquals(3, metaData.getColumnCount());
+      StringBuilder headerBuilder = new StringBuilder();
+      for (int i = 1; i <= metaData.getColumnCount(); i++) {
+        headerBuilder.append(metaData.getColumnName(i)).append(", ");
+      }
+      Assert.assertEquals(headerResult, headerBuilder.toString());
+
+      int cnt = 0;
+      while (resultSet.next()) {
+        cnt++;
+      }
+      Assert.assertEquals(0, cnt);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  /**
+   * Test query with timeout, which is supposed to throw an QueryTimeoutRuntimeException.
+   * Note: This test is not guaranteed to time out.
+   */
+  @Test
+  public void queryWithTimeoutTest() {
+    try (Connection connection = DriverManager.
+        getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      statement.setFetchSize(20000);
+      try {
+        ((IoTDBStatement) statement)
+            .executeQuery("select count(*) from root group by ([1, 40000), 2ms)", 1);
+        fail("QueryTimeoutRuntimeException is not thrown");
+      } catch (IoTDBSQLException e) {
+        Assert.assertTrue(e.getMessage().contains("Current query is time out"));
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  /**
+   * Test executing query after a timeout query, it's supposed to execute correctly.
+   */
+  @Test
+  public void queryAfterTimeoutQueryTest() {
+    try (Connection connection = DriverManager.
+        getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      statement.setFetchSize(20000);
+      try {
+        ((IoTDBStatement) statement)
+            .executeQuery("select count(*) from root group by ([1, 20000), 2ms)", 1);
+      } catch (IoTDBSQLException e) {
+        Assert.assertTrue(e.getMessage().contains("Current query is time out"));
+      }
+
+      Boolean hasResultSet = statement.execute("select max_time(s1) from root.sg.d1");
+      Assert.assertTrue(hasResultSet);
+      ResultSet resultSet = statement.getResultSet();
+      while (resultSet.next()) {
+        Assert.assertEquals(40000, resultSet.getLong("max_time(root.sg.d1.s1)"));
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  private static void prepareData() {
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      for (int i = 0; i <= 40000; i++) {
+        statement.execute(String.format("insert into root.sg1.d1(time,s1) values(%d,%d)", i, i));
+        statement.execute(String.format("insert into root.sg2.d2(time,s2) values(%d,%d)", i, i));
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
index 86757ba..8ed69c2 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
@@ -53,6 +53,7 @@ public class IoTDBRpcDataSet {
   public Map<String, Integer> columnOrdinalMap; // used because the server returns deduplicated columns
   public List<TSDataType> columnTypeDeduplicatedList; // deduplicated from columnTypeList
   public int fetchSize;
+  public final long timeout;
   public boolean emptyResultSet = false;
   public boolean hasCachedRecord = false;
   public boolean lastReadWasNull;
@@ -77,13 +78,14 @@ public class IoTDBRpcDataSet {
   public IoTDBRpcDataSet(String sql, List<String> columnNameList, List<String> columnTypeList,
       Map<String, Integer> columnNameIndex, boolean ignoreTimeStamp,
       long queryId, TSIService.Iface client, long sessionId, TSQueryDataSet queryDataSet,
-      int fetchSize) {
+      int fetchSize, long timeout) {
     this.sessionId = sessionId;
     this.ignoreTimeStamp = ignoreTimeStamp;
     this.sql = sql;
     this.queryId = queryId;
     this.client = client;
     this.fetchSize = fetchSize;
+    this.timeout = timeout;
     columnSize = columnNameList.size();
 
     this.columnNameList = new ArrayList<>();
@@ -200,7 +202,8 @@ public class IoTDBRpcDataSet {
 
   public boolean fetchResults() throws StatementExecutionException, IoTDBConnectionException {
     rowsIndex = 0;
-    TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId, true);
+    TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId, true,
+        timeout);
     try {
       TSFetchResultsResp resp = client.fetchResults(req);
 
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index b839c53..a6f1ee5 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -60,6 +60,7 @@ public enum TSStatusCode {
   QUERY_PROCESS_ERROR(411),
   WRITE_PROCESS_ERROR(412),
   WRITE_PROCESS_REJECT(413),
+  QUERY_ID_NOT_EXIST(414),
 
   UNSUPPORTED_INDEX_FUNC_ERROR(421),
   UNSUPPORTED_INDEX_TYPE_ERROR(422),
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java b/session/src/main/java/org/apache/iotdb/session/Config.java
index d900df7..c879a2d 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/session/src/main/java/org/apache/iotdb/session/Config.java
@@ -23,7 +23,8 @@ public class Config {
   public static final String DEFAULT_USER = "root";
   public static final String DEFAULT_PASSWORD = "root";
   public static final int DEFAULT_FETCH_SIZE = 10000;
-  public static final int DEFAULT_TIMEOUT_MS = 0;
+  public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 0;
+  public static final int DEFAULT_QUERY_TIMEOUT_MS = 60000;
   public static final boolean DEFAULT_CACHE_LEADER_MODE = true;
 
   public static final int RETRY_NUM = 3;
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index a27dd31..95b0242 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -135,11 +135,11 @@ public class Session {
   }
 
   public synchronized void open() throws IoTDBConnectionException {
-    open(false, Config.DEFAULT_TIMEOUT_MS);
+    open(false, Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
   public synchronized void open(boolean enableRPCCompression) throws IoTDBConnectionException {
-    open(enableRPCCompression, Config.DEFAULT_TIMEOUT_MS);
+    open(enableRPCCompression, Config.DEFAULT_CONNECTION_TIMEOUT_MS);
   }
 
   private synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs)
@@ -296,7 +296,7 @@ public class Session {
   }
 
   /**
-   * execure query sql
+   * execute query sql
    *
    * @param sql query statement
    * @return result set
@@ -307,6 +307,21 @@ public class Session {
   }
 
   /**
+   * execute query sql with explicit timeout
+   *
+   * @param sql query statement
+   * @param timeout the timeout of this query, in milliseconds
+   * @return result set
+   */
+  public SessionDataSet executeQueryStatement(String sql, long timeout)
+      throws StatementExecutionException, IoTDBConnectionException {
+    if (timeout <= 0) {
+      throw new StatementExecutionException("Timeout must be over 0, please check and try again.");
+    }
+    return defaultSessionConnection.executeQueryStatement(sql, timeout);
+  }
+
+  /**
    * execute non query statement
    *
    * @param sql non query statement
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 5ad0b93..8dacf21 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -258,8 +258,14 @@ public class SessionConnection {
 
   protected SessionDataSet executeQueryStatement(String sql)
       throws StatementExecutionException, IoTDBConnectionException {
+    return this.executeQueryStatement(sql, Config.DEFAULT_QUERY_TIMEOUT_MS);
+  }
+
+  protected SessionDataSet executeQueryStatement(String sql, long timeout)
+      throws StatementExecutionException, IoTDBConnectionException {
     TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, statementId);
     execReq.setFetchSize(session.fetchSize);
+    execReq.setTimeout(timeout);
     TSExecuteStatementResp execResp;
     try {
       execResp = client.executeQueryStatement(execReq);
@@ -282,9 +288,10 @@ public class SessionConnection {
     return new SessionDataSet(sql, execResp.getColumns(), execResp.getDataTypeList(),
         execResp.columnNameIndexMap,
         execResp.getQueryId(), client, sessionId, execResp.queryDataSet,
-        execResp.isIgnoreTimeStamp());
+        execResp.isIgnoreTimeStamp(), timeout);
   }
 
+
   protected void executeNonQueryStatement(String sql)
       throws IoTDBConnectionException, StatementExecutionException {
     TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, statementId);
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
index dbe078d..5caadba 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
@@ -46,7 +46,17 @@ public class SessionDataSet {
       long queryId, TSIService.Iface client, long sessionId, TSQueryDataSet queryDataSet,
       boolean ignoreTimeStamp) {
     this.ioTDBRpcDataSet = new IoTDBRpcDataSet(sql, columnNameList, columnTypeList, columnNameIndex,
-        ignoreTimeStamp, queryId, client, sessionId, queryDataSet, Config.DEFAULT_FETCH_SIZE);
+        ignoreTimeStamp, queryId, client, sessionId, queryDataSet, Config.DEFAULT_FETCH_SIZE,
+        Config.DEFAULT_QUERY_TIMEOUT_MS);
+  }
+
+  public SessionDataSet(String sql, List<String> columnNameList, List<String> columnTypeList,
+      Map<String, Integer> columnNameIndex,
+      long queryId, TSIService.Iface client, long sessionId, TSQueryDataSet queryDataSet,
+      boolean ignoreTimeStamp, long timeout) {
+    this.ioTDBRpcDataSet = new IoTDBRpcDataSet(sql, columnNameList, columnTypeList, columnNameIndex,
+        ignoreTimeStamp, queryId, client, sessionId, queryDataSet, Config.DEFAULT_FETCH_SIZE,
+        timeout);
   }
 
   public int getFetchSize() {
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java
index 439be74..b237437 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIteratorIT.java
@@ -242,6 +242,35 @@ public class IoTDBSessionIteratorIT {
     }
   }
 
+  /**
+   * Test executeQueryStatement with timeout, and the result is not timeout here.
+   */
+  @Test
+  public void queryWithTimeoutTest() {
+    String[] retArray = new String[]{
+        "9,root.sg1.d1.s1,false"
+    };
+
+    try {
+      SessionDataSet sessionDataSet = session
+          .executeQueryStatement("select last s1 from root.sg1.d1", 2000);
+      sessionDataSet.setFetchSize(1024);
+      DataIterator iterator = sessionDataSet.iterator();
+      int count = 0;
+      while (iterator.next()) {
+        String ans = String.format("%s,%s,%s", iterator.getLong(1), iterator.getString(2),
+            iterator.getString(3));
+        assertEquals(retArray[count], ans);
+        count++;
+      }
+      assertEquals(retArray.length, count);
+      sessionDataSet.closeOperationHandle();
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
   private void prepareData() throws IoTDBConnectionException, StatementExecutionException {
     session = new Session("127.0.0.1", 6667, "root", "root");
     session.open();
diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift
index 4f30d06..a1a7ea5 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -115,6 +115,8 @@ struct TSExecuteStatementReq {
   3: required i64 statementId
 
   4: optional i32 fetchSize
+
+  5: optional i64 timeout
 }
 
 struct TSExecuteBatchStatementReq{
@@ -153,6 +155,7 @@ struct TSFetchResultsReq{
   3: required i32 fetchSize
   4: required i64 queryId
   5: required bool isAlign
+  6: required i64 timeout
 }
 
 struct TSFetchResultsResp{
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/QueryTimeoutRuntimeException.java
similarity index 50%
copy from session/src/main/java/org/apache/iotdb/session/Config.java
copy to tsfile/src/main/java/org/apache/iotdb/tsfile/exception/QueryTimeoutRuntimeException.java
index d900df7..1f062c5 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/QueryTimeoutRuntimeException.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,26 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.session;
+package org.apache.iotdb.tsfile.exception;
 
-public class Config {
+/**
+ * This class is used to throw run time exception when query is time out.
+ */
+public class QueryTimeoutRuntimeException extends RuntimeException {
 
-  public static final String DEFAULT_USER = "root";
-  public static final String DEFAULT_PASSWORD = "root";
-  public static final int DEFAULT_FETCH_SIZE = 10000;
-  public static final int DEFAULT_TIMEOUT_MS = 0;
-  public static final boolean DEFAULT_CACHE_LEADER_MODE = true;
+  public static final String TIMEOUT_EXCEPTION_MESSAGE
+      = "Current query is time out, please check your statement or modify timeout parameter.";
 
-  public static final int RETRY_NUM = 3;
-  public static final long RETRY_INTERVAL_MS = 1000;
+  public QueryTimeoutRuntimeException(String message, Throwable cause) {
+    super(message, cause);
+  }
 
-  /**
-   * thrift init buffer size, 1KB by default
-   */
-  public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024;
+  public QueryTimeoutRuntimeException(String message) {
+    super(message);
+  }
 
-  /**
-   * thrift max frame size (16384000 bytes by default), we change it to 64MB
-   */
-  public static final int DEFAULT_MAX_FRAME_SIZE = 67108864;
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
index de314b1..0f4f531 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
@@ -22,9 +22,11 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
+import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.FileChannel;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
+import org.apache.iotdb.tsfile.exception.QueryTimeoutRuntimeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,6 +46,9 @@ public class LocalTsFileInput implements TsFileInput {
   public long size() throws IOException {
     try {
       return channel.size();
+    } catch (ClosedByInterruptException e) {
+      throw new QueryTimeoutRuntimeException(
+          QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
     } catch (IOException e) {
       logger.error("Error happened while getting {} size", filePath);
       throw e;
@@ -54,6 +59,9 @@ public class LocalTsFileInput implements TsFileInput {
   public long position() throws IOException {
     try {
       return channel.position();
+    } catch (ClosedByInterruptException e) {
+      throw new QueryTimeoutRuntimeException(
+          QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
     } catch (IOException e) {
       logger.error("Error happened while getting {} current position", filePath);
       throw e;
@@ -65,6 +73,9 @@ public class LocalTsFileInput implements TsFileInput {
     try {
       channel.position(newPosition);
       return this;
+    } catch (ClosedByInterruptException e) {
+      throw new QueryTimeoutRuntimeException(
+          QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
     } catch (IOException e) {
       logger.error("Error happened while changing {} position to {}", filePath, newPosition);
       throw e;
@@ -75,6 +86,9 @@ public class LocalTsFileInput implements TsFileInput {
   public int read(ByteBuffer dst) throws IOException {
     try {
       return channel.read(dst);
+    } catch (ClosedByInterruptException e) {
+      throw new QueryTimeoutRuntimeException(
+          QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
     } catch (IOException e) {
       logger.error("Error happened while reading {} from current position", filePath);
       throw e;
@@ -85,6 +99,9 @@ public class LocalTsFileInput implements TsFileInput {
   public int read(ByteBuffer dst, long position) throws IOException {
     try {
       return channel.read(dst, position);
+    } catch (ClosedByInterruptException e) {
+      throw new QueryTimeoutRuntimeException(
+          QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
     } catch (IOException e) {
       logger.error("Error happened while reading {} from position {}", filePath, position);
       throw e;
@@ -115,6 +132,9 @@ public class LocalTsFileInput implements TsFileInput {
   public void close() throws IOException {
     try {
       channel.close();
+    } catch (ClosedByInterruptException e) {
+      throw new QueryTimeoutRuntimeException(
+          QueryTimeoutRuntimeException.TIMEOUT_EXCEPTION_MESSAGE);
     } catch (IOException e) {
       logger.error("Error happened while closing {}", filePath);
       throw e;