You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/06/28 13:06:34 UTC

[iotdb] branch master updated: [IOTDB-1143] Continuous query (#3162)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a862ee  [IOTDB-1143] Continuous query (#3162)
5a862ee is described below

commit 5a862eef042ce365da1bcf5b2bb4999a6b897357
Author: mzp0514 <34...@users.noreply.github.com>
AuthorDate: Mon Jun 28 21:04:22 2021 +0800

    [IOTDB-1143] Continuous query (#3162)
---
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4   |  67 ++++
 .../Administration-Management/Administration.md    |   2 +
 .../Advanced-Features/Continuous-Query.md          | 230 +++++++++++
 .../Administration-Management/Administration.md    |   2 +
 .../Advanced-Features/Continuous-Query.md          | 232 +++++++++++
 .../main/java/org/apache/iotdb/SessionExample.java |  21 +
 .../resources/conf/iotdb-engine.properties         |  18 +
 .../org/apache/iotdb/db/auth/AuthorityChecker.java |   5 +-
 .../apache/iotdb/db/auth/entity/PrivilegeType.java |   2 +
 .../org/apache/iotdb/db/concurrent/ThreadName.java |   1 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  42 ++
 .../org/apache/iotdb/db/conf/IoTDBConstant.java    |   6 +
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  26 ++
 .../apache/iotdb/db/cq/ContinuousQueryService.java | 218 ++++++++++
 .../apache/iotdb/db/cq/ContinuousQueryTask.java    | 302 ++++++++++++++
 .../db/cq/ContinuousQueryTaskPoolManager.java      | 106 +++++
 .../ContinuousQueryException.java}                 |  24 +-
 .../apache/iotdb/db/metadata/MLogTxtWriter.java    |  25 ++
 .../org/apache/iotdb/db/metadata/MManager.java     |  12 +
 .../iotdb/db/metadata/MetadataOperationType.java   |   2 +
 .../iotdb/db/metadata/logfile/MLogWriter.java      |  12 +
 .../main/java/org/apache/iotdb/db/qp/Planner.java  |  10 +
 .../apache/iotdb/db/qp/constant/SQLConstant.java   |   8 +
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  58 +++
 .../org/apache/iotdb/db/qp/logical/Operator.java   |   4 +
 .../iotdb/db/qp/logical/crud/SelectComponent.java  |   7 +
 .../logical/sys/CreateContinuousQueryOperator.java |  98 +++++
 ...rator.java => DropContinuousQueryOperator.java} |  38 +-
 .../sys/ShowContinuousQueriesOperator.java}        |  22 +-
 .../iotdb/db/qp/logical/sys/ShowOperator.java      |   3 +
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |  13 +-
 .../qp/physical/sys/CreateContinuousQueryPlan.java | 145 +++++++
 .../qp/physical/sys/DropContinuousQueryPlan.java   |  63 +++
 .../physical/sys/ShowContinuousQueriesPlan.java}   |  21 +-
 .../apache/iotdb/db/qp/physical/sys/ShowPlan.java  |   3 +-
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    | 196 ++++++++-
 .../apache/iotdb/db/qp/utils/DatetimeUtils.java    |  24 ++
 .../query/dataset/ShowContinuousQueriesResult.java |  84 ++++
 .../query/dataset/groupby/GroupByTimeDataSet.java  |  11 +-
 .../java/org/apache/iotdb/db/service/IoTDB.java    |   2 +
 .../org/apache/iotdb/db/service/ServiceType.java   |   2 +-
 .../org/apache/iotdb/db/tools/mlog/MLogParser.java |   8 +
 .../apache/iotdb/db/utils/TypeInferenceUtils.java  |  23 ++
 .../apache/iotdb/db/auth/AuthorityCheckerTest.java |  19 +
 .../db/integration/IoTDBContinuousQueryIT.java     | 444 +++++++++++++++++++++
 .../iotdb/db/qp/physical/PhysicalPlanTest.java     | 218 ++++++++++
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   8 +-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 site/src/main/.vuepress/config.js                  |  10 +-
 49 files changed, 2801 insertions(+), 97 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4
index 9d05e7a..41235f4 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4
@@ -105,6 +105,11 @@ statement
     | STOP TRIGGER triggerName=ID #stopTrigger
     | SHOW TRIGGERS #showTriggers
     | selectClause fromClause whereClause? specialClause? #selectStatement
+    | CREATE (CONTINUOUS QUERY | CQ) continuousQueryName=ID
+      resampleClause?
+      cqSelectIntoClause #createContinuousQueryStatement
+    | DROP (CONTINUOUS QUERY | CQ) continuousQueryName=ID #dropContinuousQueryStatement
+    | SHOW (CONTINUOUS QUERIES | CQS) #showContinuousQueriesStatement
     ;
 
 selectClause
@@ -329,6 +334,25 @@ sequenceClause
     : LR_BRACKET constant (COMMA constant)* RR_BRACKET
     ;
 
+resampleClause
+    : RESAMPLE (EVERY DURATION)? (FOR DURATION)?;
+
+cqSelectIntoClause
+    : BEGIN
+    selectClause
+    INTO (fullPath | nodeNameWithoutStar)
+    fromClause
+    cqGroupByTimeClause
+    END
+    ;
+
+cqGroupByTimeClause
+    : GROUP BY TIME LR_BRACKET
+      DURATION
+      RR_BRACKET
+      (COMMA LEVEL OPERATOR_EQ INT)?
+    ;
+
 comparisonOperator
     : type = OPERATOR_GT
     | type = OPERATOR_GTE
@@ -608,6 +632,13 @@ nodeNameWithoutStar
     | PARTITION
     | DESC
     | ASC
+    | CONTINUOUS
+    | CQ
+    | CQS
+    | BEGIN
+    | END
+    | RESAMPLE
+    | EVERY
     ;
 
 dataType
@@ -1220,6 +1251,38 @@ EXPLAIN
     : E X P L A I N
     ;
 
+CONTINUOUS
+    : C O N T I N U O U S
+    ;
+
+QUERIES
+    : Q U E R I E S
+    ;
+
+CQ
+    : C Q
+    ;
+
+CQS
+    : C Q S
+    ;
+
+BEGIN
+    : B E G I N
+    ;
+
+END
+    : E N D
+    ;
+
+RESAMPLE
+    : R E S A M P L E
+    ;
+
+EVERY
+    : E V E R Y
+    ;
+
 DEBUG
     : D E B U G
     ;
@@ -1346,6 +1409,8 @@ NAME_CHAR
     |   '%'
     |   '&'
     |   '+'
+    |   '{'
+    |   '}'
     |   CN_CHAR
     ;
 
@@ -1362,6 +1427,8 @@ FIRST_NAME_CHAR
     |   '%'
     |   '&'
     |   '+'
+    |   '{'
+    |   '}'
     |   CN_CHAR
     ;
 
diff --git a/docs/UserGuide/Administration-Management/Administration.md b/docs/UserGuide/Administration-Management/Administration.md
index f01f8c9..392b6ed 100644
--- a/docs/UserGuide/Administration-Management/Administration.md
+++ b/docs/UserGuide/Administration-Management/Administration.md
@@ -148,6 +148,8 @@ At the same time, changes to roles are immediately reflected on all users who ow
 |DROP_TRIGGER|drop triggers; path independent|
 |START_TRIGGER|start triggers; path independent|
 |STOP_TRIGGER|stop triggers; path independent|
+|CREATE_CONTINUOUS_QUERY|create continuous queries; path independent|
+|DROP_CONTINUOUS_QUERY|drop continuous queries; path independent|
 
 ### Username Restrictions
 
diff --git a/docs/UserGuide/Advanced-Features/Continuous-Query.md b/docs/UserGuide/Advanced-Features/Continuous-Query.md
new file mode 100644
index 0000000..3cea3ad
--- /dev/null
+++ b/docs/UserGuide/Advanced-Features/Continuous-Query.md
@@ -0,0 +1,230 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+    
+        http://www.apache.org/licenses/LICENSE-2.0
+    
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+
+# Continuous Query, CQ
+
+We can create, drop a CQ, 
+and query all registered CQ configuration information through SQL statements.
+
+## SQL statements
+
+### Create CQ
+
+#### Syntax
+
+```sql
+CREATE CONTINUOUS QUERY <cq_id> 
+[RESAMPLE EVERY <every_interval> FOR <for_interval>]
+BEGIN
+SELECT <function>(<path_suffix>) INTO <full_path> | <node_name>
+FROM <path_prefix>
+GROUP BY time(<group_by_interval>) [, level = <level>]
+END
+```
+
+* `<cq_id>` specifies the globally unique id of CQ.
+* `<every_interval>` specifies the query execution time interval. We currently support the units of ns, us, ms, s, m, h, d, w, and its value should not be lower than the minimum threshold configured by the user. 
+* `<for_interval>` specifies the time range of each query as `[now()-<for_interval>, now())`. We currently support the units of ns, us, ms, s, m, h, d, w.
+* `<function>` specifies the aggregate function.
+* `<path_prefix>` and `<path_suffix>` are spliced into the queried time series path.
+* `<full_path>` or `<node_name>` specifies the result time series path.
+* `<group_by_interval>` specifies the time grouping length. We currently support the units of ns, us, ms, s, m, h, d, w, mo, y.
+* `<level>` refers to grouping according to the `<level>` level of the time series, and aggregates all time series below the `<level>` level. For the specific semantics of the Group By Level statement and the definition of `<level>`, see [aggregation-by-level](../IoTDB-SQL-Language/DML-Data-Manipulation-Language.md)
+
+
+Note:
+* `<for_interval>`,`<every_interval>` can optionally be specified. If the user does not specify one of them, the value of the unspecified item will be processed equal to `<group_by_interval>`.
+    * The values of `<every_interval>`, `<for_interval>` and `<group_by_interval>` should all be greater than 0.
+    * The value of `<group_by_interval>` should be less than the value of `<for_interval>`, otherwise the system will process the value equal to `<for_interval>`.
+    * The user should specify the appropriate `<for_interval>` and `<every_interval>` according to actual needs.
+        * If `<for_interval>` is greater than `<every_interval>`, there will be partial data overlap in each query window. This configuration is not recommended from the perspective of query performance.
+        * If `<for_interval>` is less than `<every_interval>`, there may be uncovered data between each query window.
+* For the result series path
+    * The user can choose to specify `<full_path>`, which is the complete time series path starting with `root`. The user can use the `${x}` variable in the path to represent the node name of `level = x` in the original time series. `x` should be greater than or equal to 0 and less than or equal to the value of `<level>`
+      (If `level` is not specified, it should be less than or equal to the level, i.e. length, of `<path_prefix>`).
+    * The user can also specify only `<node_name>`, which is the last node name of the result time series path.
+        * If the user specifies `<level> = l`, the result time series path generated by the system is `root.${1}. ... .${l}.<node_name>`
+        * If the user does not specify `<level>`, let the maximum level of the original time series be `L`,
+          Then the result time series path generated by the system is `root.${1}. ... .${L-1}.<node_name>`.
+
+#### Examples
+
+##### Original Data
+````
++-----------------------------+-----+-------------+--------+--------+-----------+----+----------+
+|                   timeseries|alias|storage group|dataType|encoding|compression|tags|attributes|
++-----------------------------+-----+-------------+--------+--------+-----------+----+----------+
+|root.ln.wf02.wt02.temperature| null|      root.ln|   FLOAT| GORILLA|     SNAPPY|null|      null|
+|root.ln.wf02.wt01.temperature| null|      root.ln|   FLOAT| GORILLA|     SNAPPY|null|      null|
+|root.ln.wf01.wt02.temperature| null|      root.ln|   FLOAT| GORILLA|     SNAPPY|null|      null|
+|root.ln.wf01.wt01.temperature| null|      root.ln|   FLOAT| GORILLA|     SNAPPY|null|      null|
++-----------------------------+-----+-------------+--------+--------+-----------+----+----------+
+````
+
+````
++-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
+|                         Time|root.ln.wf02.wt02.temperature|root.ln.wf02.wt01.temperature|root.ln.wf01.wt02.temperature|root.ln.wf01.wt01.temperature|
++-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
+|2021-05-11T22:18:14.598+08:00|                        121.0|                         72.0|                        183.0|                        115.0|
+|2021-05-11T22:18:19.941+08:00|                          0.0|                         68.0|                         68.0|                        103.0|
+|2021-05-11T22:18:24.949+08:00|                        122.0|                         45.0|                         11.0|                         14.0|
+|2021-05-11T22:18:29.967+08:00|                         47.0|                         14.0|                         59.0|                        181.0|
+|2021-05-11T22:18:34.979+08:00|                        182.0|                        113.0|                         29.0|                        180.0|
+|2021-05-11T22:18:39.990+08:00|                         42.0|                         11.0|                         52.0|                         19.0|
+|2021-05-11T22:18:44.995+08:00|                         78.0|                         38.0|                        123.0|                         52.0|
+|2021-05-11T22:18:49.999+08:00|                        137.0|                        172.0|                        135.0|                        193.0|
+|2021-05-11T22:18:55.003+08:00|                         16.0|                        124.0|                        183.0|                         18.0|
++-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
+````
+
+
+##### Result time series path configuration example
+
+For the above original time series, if the user specifies that the query aggregation level is `2`, the aggregation function is `avg`,
+The user can specify only the last node name of the generated time series in the `INTO` clause. If the user specifies it as `temperature_avg`, the full path generated by the system will be `root.${1}.${2}.temperature_avg` .
+The user can also specify the full path in the `INTO` clause, and the user can specify it as `root.${1}.${2}.temperature_avg`, `root.ln_cq.${2}.temperature_avg`, `root.${1}_cq.${2}.temperature_avg`, `root.${1}.${2}_cq.temperature_avg` etc.,
+It can also be specified as `root.${2}.${1}.temperature_avg` and others as needed.
+It should be noted that the `x` in `${x}` should be greater than or equal to `1` and less than or equal to the value of `<level>`
+(If `<level>` is not specified, it should be less than or equal to the length of `<path_prefix>`). In the above example, `x` should be less than or equal to `2`.
+
+##### Create `cq1`
+````
+CREATE CONTINUOUS QUERY cq1 BEGIN SELECT max_value(temperature) INTO temperature_max FROM root.ln.*.* GROUP BY time(10s) END
+````
+
+Query the maximum value of `root.ln.*.*.temperature` in the previous 10s every 10s (the results are grouped by 10s),
+ and the results will be written to `root.${1}.${2}.${3}.temperature_max`,
+As a result, 4 new time series will be generated.
+````
++---------------------------------+-----+-------------+--------+--------+-----------+----+----------+
+|                       timeseries|alias|storage group|dataType|encoding|compression|tags|attributes|
++---------------------------------+-----+-------------+--------+--------+-----------+----+----------+
+|root.ln.wf02.wt02.temperature_max| null|      root.ln|   FLOAT| GORILLA|     SNAPPY|null|      null|
+|root.ln.wf02.wt01.temperature_max| null|      root.ln|   FLOAT| GORILLA|     SNAPPY|null|      null|
+|root.ln.wf01.wt02.temperature_max| null|      root.ln|   FLOAT| GORILLA|     SNAPPY|null|      null|
+|root.ln.wf01.wt01.temperature_max| null|      root.ln|   FLOAT| GORILLA|     SNAPPY|null|      null|
++---------------------------------+-----+-------------+--------+--------+-----------+----+----------+
+````
+````
++-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
+|                         Time|root.ln.wf02.wt02.temperature_max|root.ln.wf02.wt01.temperature_max|root.ln.wf01.wt02.temperature_max|root.ln.wf01.wt01.temperature_max|
++-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
+|2021-05-11T22:18:16.964+08:00|                            122.0|                             68.0|                             68.0|                            103.0|
+|2021-05-11T22:18:26.964+08:00|                            182.0|                            113.0|                             59.0|                            181.0|
+|2021-05-11T22:18:36.964+08:00|                             78.0|                             38.0|                            123.0|                             52.0|
+|2021-05-11T22:18:46.964+08:00|                            137.0|                            172.0|                            183.0|                            193.0|
++-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
+````
+##### Create `cq2`
+````
+CREATE CONTINUOUS QUERY cq2 RESAMPLE EVERY 20s FOR 20s BEGIN SELECT avg(temperature) INTO temperature_avg FROM root.ln.*.* GROUP BY time(10s), level=2 END
+````
+Query the average value of `root.ln.*.*.temperature` in the previous 20s every 20s (the results are grouped by 10s),
+ and the results will be written to `root.${1}.${2}.temperature_avg`,
+As a result, 2 new time series will be generated.
+Among them, `root.ln.wf02.temperature_avg` is generated by the aggregation calculation of `root.ln.wf02.wt02.temperature` and `root.ln.wf02.wt01.temperature`,
+and `root.ln.wf01.temperature_avg` is generated by the aggregation calculation of `root.ln.wf01.wt02.temperature` and `root.ln.wf01.wt01.temperature`.
+````
++----------------------------+-----+-------------+--------+--------+-----------+----+----------+
+|                  timeseries|alias|storage group|dataType|encoding|compression|tags|attributes|
++----------------------------+-----+-------------+--------+--------+-----------+----+----------+
+|root.ln.wf02.temperature_avg| null|      root.ln|  DOUBLE| GORILLA|     SNAPPY|null|      null|
+|root.ln.wf01.temperature_avg| null|      root.ln|  DOUBLE| GORILLA|     SNAPPY|null|      null|
++----------------------------+-----+-------------+--------+--------+-----------+----+----------+
+````
+````
++-----------------------------+----------------------------+----------------------------+
+|                         Time|root.ln.wf02.temperature_avg|root.ln.wf01.temperature_avg|
++-----------------------------+----------------------------+----------------------------+
+|2021-05-11T22:18:16.969+08:00|                       58.75|                        49.0|
+|2021-05-11T22:18:26.969+08:00|                        89.0|                      112.25|
+|2021-05-11T22:18:36.969+08:00|                       42.25|                        61.5|
+|2021-05-11T22:18:46.969+08:00|                      112.25|                      132.25|
++-----------------------------+----------------------------+----------------------------+
+````
+##### Create `cq3`
+````
+CREATE CONTINUOUS QUERY cq3 RESAMPLE EVERY 20s FOR 20s BEGIN SELECT avg(temperature) INTO root.ln_cq.${2}.temperature_avg FROM root.ln.*.* GROUP BY time(10s), level=2 END
+````
+
+The query mode is the same as `cq2`,
+and the results will be written to `root.ln_cq.${2}.temperature_avg`.
+As a result, 2 new time series will be generated.
+Among them, `root.ln_cq.wf02.temperature_avg` is generated by the aggregation calculation of `root.ln.wf02.wt02.temperature` and `root.ln.wf02.wt01.temperature`,
+and `root.ln_cq.wf01.temperature_avg` is generated by the aggregation calculation of `root.ln.wf01.wt02.temperature` and `root.ln.wf01.wt01.temperature`.
+    
+````
++-------------------------------+-----+-------------+--------+--------+-----------+----+----------+
+|                     timeseries|alias|storage group|dataType|encoding|compression|tags|attributes|
++-------------------------------+-----+-------------+--------+--------+-----------+----+----------+
+|root.ln_cq.wf02.temperature_avg| null|   root.ln_cq|  DOUBLE| GORILLA|     SNAPPY|null|      null|
+|root.ln_cq.wf01.temperature_avg| null|   root.ln_cq|  DOUBLE| GORILLA|     SNAPPY|null|      null|
++-------------------------------+-----+-------------+--------+--------+-----------+----+----------+
+````
+````
++-----------------------------+-------------------------------+-------------------------------+
+|                         Time|root.ln_cq.wf02.temperature_avg|root.ln_cq.wf01.temperature_avg|
++-----------------------------+-------------------------------+-------------------------------+
+|2021-05-11T22:18:16.971+08:00|                          58.75|                           49.0|
+|2021-05-11T22:18:26.971+08:00|                           89.0|                         112.25|
+|2021-05-11T22:18:36.971+08:00|                          42.25|                           61.5|
+|2021-05-11T22:18:46.971+08:00|                         112.25|                         132.25|
++-----------------------------+-------------------------------+-------------------------------+
+````
+
+### Show CQ Information
+#### Syntax
+````
+SHOW (CONTINUOUS QUERIES | CQS) 
+````
+
+#### Example Result
+````
++-------+--------------+------------+----------------------------------------------------------------------------------------+-----------------------------------+
+|cq name|every interval|for interval|                                                                               query sql|                        target path|
++-------+--------------+------------+----------------------------------------------------------------------------------------+-----------------------------------+
+|    cq1|         10000|       10000|     select max_value(temperature) from root.ln.*.* group by ([now() - 10s, now()), 10s)|root.${1}.${2}.${3}.temperature_max|
+|    cq3|         20000|       20000|select avg(temperature) from root.ln.*.* group by ([now() - 20s, now()), 10s), level = 2|    root.ln_cq.${2}.temperature_avg|
+|    cq2|         20000|       20000|select avg(temperature) from root.ln.*.* group by ([now() - 20s, now()), 10s), level = 2|     root.${1}.${2}.temperature_avg|
++-------+--------------+------------+----------------------------------------------------------------------------------------+-----------------------------------+
+````
+
+### Drop CQ
+#### Syntax
+````
+DROP (CONTINUOUS QUERY | CQ) <cq_id> 
+````
+
+#### Example
+
+````
+DROP CONTINUOUS QUERY cq3
+````
+
+````
+DROP CQ cq3
+````
+## System Parameter Configuration
+| Name | Description | Data Type | Default Value |
+| :---------------------------------- |-------- | ---- | -----|
+| `continuous_query_execution_thread` | The number of threads in the thread pool that executes continuous query tasks | int | max(1, CPU core number / 2)|
+| `max_pending_continuous_query_tasks` | The maximum number of continuous query tasks pending in queue | int | 64|
+| `continuous_query_min_every_interval` | The minimum value of the continuous query execution time interval | duration | 1s|
\ No newline at end of file
diff --git a/docs/zh/UserGuide/Administration-Management/Administration.md b/docs/zh/UserGuide/Administration-Management/Administration.md
index d72d46a..e04fd71 100644
--- a/docs/zh/UserGuide/Administration-Management/Administration.md
+++ b/docs/zh/UserGuide/Administration-Management/Administration.md
@@ -154,6 +154,8 @@ Msg: The statement is executed successfully.
 |DROP_TRIGGER|卸载触发器。路径无关|
 |START_TRIGGER|启动触发器。路径无关|
 |STOP_TRIGGER|停止触发器。路径无关|
+|CREATE_CONTINUOUS_QUERY|创建连续查询。路径无关|
+|DROP_CONTINUOUS_QUERY|卸载连续查询。路径无关|
 
 ### 用户名限制
 
diff --git a/docs/zh/UserGuide/Advanced-Features/Continuous-Query.md b/docs/zh/UserGuide/Advanced-Features/Continuous-Query.md
new file mode 100644
index 0000000..194c112
--- /dev/null
+++ b/docs/zh/UserGuide/Advanced-Features/Continuous-Query.md
@@ -0,0 +1,232 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+    
+        http://www.apache.org/licenses/LICENSE-2.0
+    
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+
+# 连续查询(Continuous Query, CQ)
+
+我们可以通过 SQL 语句注册、或卸载一个 CQ 实例,以及查询到所有已经注册的 CQ 配置信息。
+
+## SQL 语句
+
+### 创建 CQ
+
+#### 语法
+
+```sql
+CREATE CONTINUOUS QUERY <cq_id> 
+[RESAMPLE EVERY <every_interval> FOR <for_interval>] 
+BEGIN 
+SELECT <function>(<path_suffix>) INTO <full_path> | <node_name>
+FROM <path_prefix>
+GROUP BY time(<group_by_interval>) [, level = <level>] 
+END
+```
+
+其中:
+
+* `<cq_id>` 指定 CQ 全局唯一的 id。
+* `<every_interval>` 指定查询执行时间间隔,支持 ns、us、ms、s、m、h、d、w 等单位,其值不应小于用户所配置的 `continuous_query_min_every_interval` 值。可选择指定。
+* `<for_interval>` 指定每次查询的窗口大小,即查询时间范围为`[now() - <for_interval>, now())`,其中 `now()` 指查询时的时间戳。支持 ns、us、ms、s、m、h
+  、d、w 等单位。可选择指定。 
+* `<function>` 指定聚合函数,目前支持 `count`, `sum`, `avg`, `last_value`, `first_value`, `min_time`, `max_time`, `min_value`, `max_value` 等。
+* `<path_prefix>` 与 `<path_suffix>` 拼接成完整的查询原时间序列。
+* `<full_path>` 或 `<node_name>` 指定将查询出的数据写入的结果序列路径。
+* `<group_by_interval>` 指定时间分组长度,支持 ns、us、ms、s、m、h
+  、d、w、mo、y 等单位。
+* `<level>`指按照序列第 `<level>` 层分组,将第 `<level>` 层以下的所有序列聚合。Group By Level 语句的具体语义及 `<level>` 的定义见 [路径层级分组聚合](../IoTDB-SQL-Language/DML-Data-Manipulation-Language.md)。
+
+
+注:
+
+* `<for_interval>`,`<every_interval>` 可选择指定。如果用户没有指定其中的某一项,则未指定项的值按照`<group_by_interval>` 处理。
+    * `<every_interval>`,`<for_interval>`,`<group_by_interval>` 的值均应大于 0。
+    * `<group_by_interval>` 的值应小于`<for_interval>`的值,否则系统会按照等于`<for_interval>`的值处理。 
+    * 用户应当结合实际需求指定合适的 `<for_interval>` 与 `<every_interval>`。
+      * 若 `<for_interval>` 大于 `<every_interval>`,每次的查询窗口会有部分数据重叠,从查询性能角度这种配置不被建议。
+      * 若 `<for_interval>` 小于 `<every_interval>`,每次的查询窗口之间可能会有未覆盖到的数据。
+*  对于结果序列路径
+     * 用户可以选择指定`<full_path>`,即以 `root` 开头的完整的时间序列路径,用户可以在路径中使用 `${x}` 变量来表示原始时间序列中 `level = x` 的节点名称,`x`应当大于等于 1 且小于等于 `<level>` 值
+       (若未指定 `level`,则应小于等于 `<path_prefix>` 长度)。
+    * 用户也可以仅指定`<node_name>`,即生成时间序列路径的最后一个结点名。
+      * 若用户指定  `<level> = l`,则系统生成的结果时间序列路径为 `root.${1}. ... .${l}.<node_name>`
+      * 若用户未指定 `<level>`,令原始时间序列最大层数为 `L`, 
+      则系统生成的结果时间序列路径为 `root.${1}. ... .${L - 1}.<node_name>`。
+
+
+
+#### 示例
+
+##### 原始时间序列
+````
++-----------------------------+-----+-------------+--------+--------+-----------+----+----------+
+|                   timeseries|alias|storage group|dataType|encoding|compression|tags|attributes|
++-----------------------------+-----+-------------+--------+--------+-----------+----+----------+
+|root.ln.wf02.wt02.temperature| null|      root.ln|   FLOAT| GORILLA|     SNAPPY|null|      null|
+|root.ln.wf02.wt01.temperature| null|      root.ln|   FLOAT| GORILLA|     SNAPPY|null|      null|
+|root.ln.wf01.wt02.temperature| null|      root.ln|   FLOAT| GORILLA|     SNAPPY|null|      null|
+|root.ln.wf01.wt01.temperature| null|      root.ln|   FLOAT| GORILLA|     SNAPPY|null|      null|
++-----------------------------+-----+-------------+--------+--------+-----------+----+----------+
+````
+
+
+````
++-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
+|                         Time|root.ln.wf02.wt02.temperature|root.ln.wf02.wt01.temperature|root.ln.wf01.wt02.temperature|root.ln.wf01.wt01.temperature|
++-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
+|2021-05-11T22:18:14.598+08:00|                        121.0|                         72.0|                        183.0|                        115.0|
+|2021-05-11T22:18:19.941+08:00|                          0.0|                         68.0|                         68.0|                        103.0|
+|2021-05-11T22:18:24.949+08:00|                        122.0|                         45.0|                         11.0|                         14.0|
+|2021-05-11T22:18:29.967+08:00|                         47.0|                         14.0|                         59.0|                        181.0|
+|2021-05-11T22:18:34.979+08:00|                        182.0|                        113.0|                         29.0|                        180.0|
+|2021-05-11T22:18:39.990+08:00|                         42.0|                         11.0|                         52.0|                         19.0|
+|2021-05-11T22:18:44.995+08:00|                         78.0|                         38.0|                        123.0|                         52.0|
+|2021-05-11T22:18:49.999+08:00|                        137.0|                        172.0|                        135.0|                        193.0|
+|2021-05-11T22:18:55.003+08:00|                         16.0|                        124.0|                        183.0|                         18.0|
++-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
+````
+
+##### 结果序列配置举例说明
+
+对于以上原始时间序列,若用户指定查询聚合层级为 `2`,聚合函数为 `avg`,
+用户可以在 `INTO` 语句中仅指定生成序列的最后一个结点名,若用户将其指定为 `temperature_avg`,则系统生成的完整路径为 `root.${1}.${2}.temperature_avg`。
+用户也可以在 `INTO` 语句中指定完整写入路径,用户可将其指定为 `root.${1}.${2}.temperature_avg`、`root.ln_cq.${2}.temperature_avg`、`root.${1}_cq.${2}.temperature_avg`、`root.${1}.${2}_cq.temperature_avg`等,
+也可以按需要指定为 `root.${2}.${1}.temperature_avg` 等其它形式。
+需要注意的是,`${x}` 中的 `x` 应当大于等于 `1` 且小于等于 `<level>` 值
+(若未指定 `<level>`,则应小于等于 `<path_prefix>` 层级)。在上例中,`x` 应当小于等于 `2`。
+
+
+##### 创建 `cq1`
+````
+CREATE CONTINUOUS QUERY cq1 BEGIN SELECT max_value(temperature) INTO temperature_max FROM root.ln.*.* GROUP BY time(10s) END
+````
+
+每隔 10s 查询 `root.ln.*.*.temperature` 在前 10s 内的最大值(结果以10s为一组),
+将结果写入到 `root.${1}.${2}.${3}.temperature_max` 中,
+结果将产生4条新序列:
+````
++---------------------------------+-----+-------------+--------+--------+-----------+----+----------+
+|                       timeseries|alias|storage group|dataType|encoding|compression|tags|attributes|
++---------------------------------+-----+-------------+--------+--------+-----------+----+----------+
+|root.ln.wf02.wt02.temperature_max| null|      root.ln|   FLOAT| GORILLA|     SNAPPY|null|      null|
+|root.ln.wf02.wt01.temperature_max| null|      root.ln|   FLOAT| GORILLA|     SNAPPY|null|      null|
+|root.ln.wf01.wt02.temperature_max| null|      root.ln|   FLOAT| GORILLA|     SNAPPY|null|      null|
+|root.ln.wf01.wt01.temperature_max| null|      root.ln|   FLOAT| GORILLA|     SNAPPY|null|      null|
++---------------------------------+-----+-------------+--------+--------+-----------+----+----------+
+````
+````
++-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
+|                         Time|root.ln.wf02.wt02.temperature_max|root.ln.wf02.wt01.temperature_max|root.ln.wf01.wt02.temperature_max|root.ln.wf01.wt01.temperature_max|
++-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
+|2021-05-11T22:18:16.964+08:00|                            122.0|                             68.0|                             68.0|                            103.0|
+|2021-05-11T22:18:26.964+08:00|                            182.0|                            113.0|                             59.0|                            181.0|
+|2021-05-11T22:18:36.964+08:00|                             78.0|                             38.0|                            123.0|                             52.0|
+|2021-05-11T22:18:46.964+08:00|                            137.0|                            172.0|                            183.0|                            193.0|
++-----------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
+````
+##### 创建 `cq2`
+````
+CREATE CONTINUOUS QUERY cq2 RESAMPLE EVERY 20s FOR 20s BEGIN SELECT avg(temperature) INTO temperature_avg FROM root.ln.*.* GROUP BY time(10s), level=2 END
+````
+
+每隔 20s 查询 `root.ln.*.*.temperature` 在前 20s 内的平均值(结果以10s为一组,按照第2层节点分组),
+将结果写入到 `root.${1}.${2}.temperature_avg` 中。
+结果将产生如下两条新序列,
+其中 `root.ln.wf02.temperature_avg` 由 `root.ln.wf02.wt02.temperature` 和 `root.ln.wf02.wt01.temperature` 聚合计算生成,
+`root.ln.wf01.temperature_avg` 由 `root.ln.wf01.wt02.temperature` 和 `root.ln.wf01.wt01.temperature` 聚合计算生成。
+
+````
++----------------------------+-----+-------------+--------+--------+-----------+----+----------+
+|                  timeseries|alias|storage group|dataType|encoding|compression|tags|attributes|
++----------------------------+-----+-------------+--------+--------+-----------+----+----------+
+|root.ln.wf02.temperature_avg| null|      root.ln|  DOUBLE| GORILLA|     SNAPPY|null|      null|
+|root.ln.wf01.temperature_avg| null|      root.ln|  DOUBLE| GORILLA|     SNAPPY|null|      null|
++----------------------------+-----+-------------+--------+--------+-----------+----+----------+
+````
+````
++-----------------------------+----------------------------+----------------------------+
+|                         Time|root.ln.wf02.temperature_avg|root.ln.wf01.temperature_avg|
++-----------------------------+----------------------------+----------------------------+
+|2021-05-11T22:18:16.969+08:00|                       58.75|                        49.0|
+|2021-05-11T22:18:26.969+08:00|                        89.0|                      112.25|
+|2021-05-11T22:18:36.969+08:00|                       42.25|                        61.5|
+|2021-05-11T22:18:46.969+08:00|                      112.25|                      132.25|
++-----------------------------+----------------------------+----------------------------+
+````
+##### 创建 `cq3`
+````
+CREATE CONTINUOUS QUERY cq3 RESAMPLE EVERY 20s FOR 20s BEGIN SELECT avg(temperature) INTO root.ln_cq.${2}.temperature_avg FROM root.ln.*.* GROUP BY time(10s), level=2 END
+````
+查询模式与 cq2 相同,在这个例子中,用户自行指定结果写入到 `root.ln_cq.${2}.temperature_avg` 中。
+结果将产生如下两条新序列,
+其中 `root.ln_cq.wf02.temperature_avg` 由 `root.ln.wf02.wt02.temperature` 和 `root.ln.wf02.wt01.temperature` 聚合计算生成,
+`root.ln_cq.wf01.temperature_avg` 由 `root.ln.wf01.wt02.temperature` 和 `root.ln.wf01.wt01.temperature` 聚合计算生成。
+````
++-------------------------------+-----+-------------+--------+--------+-----------+----+----------+
+|                     timeseries|alias|storage group|dataType|encoding|compression|tags|attributes|
++-------------------------------+-----+-------------+--------+--------+-----------+----+----------+
+|root.ln_cq.wf02.temperature_avg| null|   root.ln_cq|  DOUBLE| GORILLA|     SNAPPY|null|      null|
+|root.ln_cq.wf01.temperature_avg| null|   root.ln_cq|  DOUBLE| GORILLA|     SNAPPY|null|      null|
++-------------------------------+-----+-------------+--------+--------+-----------+----+----------+
+````
+````
++-----------------------------+-------------------------------+-------------------------------+
+|                         Time|root.ln_cq.wf02.temperature_avg|root.ln_cq.wf01.temperature_avg|
++-----------------------------+-------------------------------+-------------------------------+
+|2021-05-11T22:18:16.971+08:00|                          58.75|                           49.0|
+|2021-05-11T22:18:26.971+08:00|                           89.0|                         112.25|
+|2021-05-11T22:18:36.971+08:00|                          42.25|                           61.5|
+|2021-05-11T22:18:46.971+08:00|                         112.25|                         132.25|
++-----------------------------+-------------------------------+-------------------------------+
+````
+
+### 展示 CQ 信息
+#### 语法
+````
+SHOW CONTINUOUS QUERIES 
+````
+#### 结果示例
+````
++-------+--------------+------------+----------------------------------------------------------------------------------------+-----------------------------------+
+|cq name|every interval|for interval|                                                                               query sql|                        target path|
++-------+--------------+------------+----------------------------------------------------------------------------------------+-----------------------------------+
+|    cq1|         10000|       10000|     select max_value(temperature) from root.ln.*.* group by ([now() - 10s, now()), 10s)|root.${1}.${2}.${3}.temperature_max|
+|    cq3|         20000|       20000|select avg(temperature) from root.ln.*.* group by ([now() - 20s, now()), 10s), level = 2|    root.ln_cq.${2}.temperature_avg|
+|    cq2|         20000|       20000|select avg(temperature) from root.ln.*.* group by ([now() - 20s, now()), 10s), level = 2|     root.${1}.${2}.temperature_avg|
++-------+--------------+------------+----------------------------------------------------------------------------------------+-----------------------------------+
+````
+### 删除 CQ
+#### 语法
+````
+DROP CONTINUOUS QUERY <cq_id> 
+````
+#### 示例
+````
+DROP CONTINUOUS QUERY cq3
+````
+
+## 系统参数配置
+| 参数名          | 描述           |  数据类型| 默认值 |
+| :---------------------------------- |-------- | ----| -----|
+| `continuous_query_execution_thread` | 执行连续查询任务的线程池的线程数 | int | max(1, CPU 核数 / 2)|
+| `max_pending_continuous_query_tasks` | 队列中连续查询最大任务堆积数 | int | 64|
+| `continuous_query_min_every_interval` | 连续查询执行时间间隔的最小值 | duration | 1s|
+
+
+
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 35423b0..c27c17d 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -73,6 +73,7 @@ public class SessionExample {
     insertTablet();
     insertTablets();
     insertRecords();
+    createAndDropContinuousQueries();
     nonQuery();
     query();
     queryWithTimeout();
@@ -96,6 +97,26 @@ public class SessionExample {
     session.close();
   }
 
+  private static void createAndDropContinuousQueries()
+      throws StatementExecutionException, IoTDBConnectionException {
+    session.executeNonQueryStatement(
+        "CREATE CONTINUOUS QUERY cq1 "
+            + "BEGIN SELECT max_value(s1) INTO temperature_max FROM root.sg1.* "
+            + "GROUP BY time(10s) END");
+    session.executeNonQueryStatement(
+        "CREATE CONTINUOUS QUERY cq2 "
+            + "BEGIN SELECT count(s2) INTO temperature_cnt FROM root.sg1.* "
+            + "GROUP BY time(10s), level=1 END");
+    session.executeNonQueryStatement(
+        "CREATE CONTINUOUS QUERY cq3 "
+            + "RESAMPLE EVERY 20s FOR 20s "
+            + "BEGIN SELECT avg(s3) INTO temperature_avg FROM root.sg1.* "
+            + "GROUP BY time(10s), level=1 END");
+    session.executeNonQueryStatement("DROP CONTINUOUS QUERY cq1");
+    session.executeNonQueryStatement("DROP CONTINUOUS QUERY cq2");
+    session.executeNonQueryStatement("DROP CONTINUOUS QUERY cq3");
+  }
+
   private static void createTimeseries()
       throws IoTDBConnectionException, StatementExecutionException {
 
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 9dd5434..22706da 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -798,6 +798,24 @@ timestamp_precision=ms
 # max_pending_window_evaluation_tasks=64
 
 ####################
+### Continuous Query Configuration
+####################
+
+# How many thread will be set up to perform continuous queries. When <= 0, use max(1, CPU core number / 2).
+# Datatype: int
+# continuous_query_execution_thread=2
+
+# Maximum number of continuous query tasks that can be pending for execution. When <= 0, the value is
+# 64 by default.
+# Datatype: int
+# max_pending_continuous_query_tasks=64
+
+# Minimum every interval to perform continuous query.
+# The every interval of continuous query instances should not be lower than this limit.
+# Datatype: duration
+# continuous_query_min_every_interval=1s
+
+####################
 ### Index Configuration
 ####################
 
diff --git a/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java b/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
index 1b1bdc7..c0f86b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
+++ b/server/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
@@ -73,7 +73,6 @@ public class AuthorityChecker {
     } else {
       return checkOnePath(username, null, permission);
     }
-
     return true;
   }
 
@@ -157,6 +156,10 @@ public class AuthorityChecker {
         return PrivilegeType.START_TRIGGER.ordinal();
       case STOP_TRIGGER:
         return PrivilegeType.STOP_TRIGGER.ordinal();
+      case CREATE_CONTINUOUS_QUERY:
+        return PrivilegeType.CREATE_CONTINUOUS_QUERY.ordinal();
+      case DROP_CONTINUOUS_QUERY:
+        return PrivilegeType.DROP_CONTINUOUS_QUERY.ordinal();
       default:
         logger.error("Unrecognizable operator type ({}) for AuthorityChecker.", type);
         return -1;
diff --git a/server/src/main/java/org/apache/iotdb/db/auth/entity/PrivilegeType.java b/server/src/main/java/org/apache/iotdb/db/auth/entity/PrivilegeType.java
index abf7b52..66613ed 100644
--- a/server/src/main/java/org/apache/iotdb/db/auth/entity/PrivilegeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/auth/entity/PrivilegeType.java
@@ -46,6 +46,8 @@ public enum PrivilegeType {
   DROP_TRIGGER,
   START_TRIGGER,
   STOP_TRIGGER,
+  CREATE_CONTINUOUS_QUERY,
+  DROP_CONTINUOUS_QUERY,
   ALL;
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index 87a21c0..0850b75 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -45,6 +45,7 @@ public enum ThreadName {
   TIME_COST_STATISTIC("TIME_COST_STATISTIC"),
   QUERY_SERVICE("Query"),
   WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"),
+  CONTINUOUS_QUERY_SERVICE("ContinuousQueryTaskPoolManager"),
   CLUSTER_INFO_SERVICE("ClusterInfoClient");
 
   private final String name;
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 5ba3a34..6f7820c 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -530,6 +530,24 @@ public class IoTDBConfig {
    */
   private int compactionThreadNum = 10;
 
+  /*
+   * How many thread will be set up to perform continuous queries. When <= 0, use max(1, CPU core number / 2).
+   */
+  private int continuousQueryThreadNum =
+      Math.max(1, Runtime.getRuntime().availableProcessors() / 2);
+
+  /*
+   * Maximum number of continuous query tasks that can be pending for execution. When <= 0, the value is
+   * 64 by default.
+   */
+  private int maxPendingContinuousQueryTasks = 64;
+
+  /*
+   * Minimum every interval to perform continuous query.
+   * The every interval of continuous query instances should not be lower than this limit.
+   */
+  private long continuousQueryMinimumEveryInterval = 1000;
+
   private MergeFileStrategy mergeFileStrategy = MergeFileStrategy.MAX_SERIES_NUM;
 
   /** Default system file storage is in local file system (unsupported) */
@@ -1423,6 +1441,30 @@ public class IoTDBConfig {
     this.compactionThreadNum = compactionThreadNum;
   }
 
+  public int getContinuousQueryThreadNum() {
+    return continuousQueryThreadNum;
+  }
+
+  public void setContinuousQueryThreadNum(int continuousQueryThreadNum) {
+    this.continuousQueryThreadNum = continuousQueryThreadNum;
+  }
+
+  public int getMaxPendingContinuousQueryTasks() {
+    return maxPendingContinuousQueryTasks;
+  }
+
+  public void setMaxPendingContinuousQueryTasks(int maxPendingContinuousQueryTasks) {
+    this.maxPendingContinuousQueryTasks = maxPendingContinuousQueryTasks;
+  }
+
+  public long getContinuousQueryMinimumEveryInterval() {
+    return continuousQueryMinimumEveryInterval;
+  }
+
+  public void setContinuousQueryMinimumEveryInterval(long minimumEveryInterval) {
+    this.continuousQueryMinimumEveryInterval = minimumEveryInterval;
+  }
+
   public int getMergeWriteThroughputMbPerSec() {
     return mergeWriteThroughputMbPerSec;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index 6f26726..ef68816 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -93,6 +93,12 @@ public class IoTDBConstant {
   public static final String COLUMN_FUNCTION_TYPE = "function type";
   public static final String COLUMN_FUNCTION_CLASS = "class name (UDF)";
 
+  public static final String COLUMN_CONTINUOUS_QUERY_NAME = "cq name";
+  public static final String COLUMN_CONTINUOUS_QUERY_EVERY_INTERVAL = "every interval";
+  public static final String COLUMN_CONTINUOUS_QUERY_FOR_INTERVAL = "for interval";
+  public static final String COLUMN_CONTINUOUS_QUERY_TARGET_PATH = "target path";
+  public static final String COLUMN_CONTINUOUS_QUERY_QUERY_SQL = "query sql";
+
   public static final String FUNCTION_TYPE_NATIVE = "native";
   public static final String FUNCTION_TYPE_BUILTIN_UDAF = "built-in UDAF";
   public static final String FUNCTION_TYPE_BUILTIN_UDTF = "built-in UDTF";
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index e3f5f15..025d604 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.conf;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.utils.DatetimeUtils;
 import org.apache.iotdb.db.utils.FilePathUtils;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -487,6 +488,31 @@ public class IoTDBDescriptor {
           Integer.parseInt(
               properties.getProperty(
                   "compaction_thread_num", Integer.toString(conf.getCompactionThreadNum()))));
+
+      conf.setContinuousQueryThreadNum(
+          Integer.parseInt(
+              properties.getProperty(
+                  "continuous_query_thread_num",
+                  Integer.toString(conf.getContinuousQueryThreadNum()))));
+
+      if (conf.getContinuousQueryThreadNum() <= 0) {
+        conf.setContinuousQueryThreadNum(Runtime.getRuntime().availableProcessors() / 2);
+      }
+
+      conf.setMaxPendingContinuousQueryTasks(
+          Integer.parseInt(
+              properties.getProperty(
+                  "max_pending_continuous_query_tasks",
+                  Integer.toString(conf.getMaxPendingContinuousQueryTasks()))));
+
+      if (conf.getMaxPendingContinuousQueryTasks() <= 0) {
+        conf.setMaxPendingContinuousQueryTasks(64);
+      }
+
+      conf.setContinuousQueryMinimumEveryInterval(
+          DatetimeUtils.convertDurationStrToLong(
+              properties.getProperty("continuous_query_minimum_every_interval", "1s")));
+
       conf.setMergeWriteThroughputMbPerSec(
           Integer.parseInt(
               properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
new file mode 100644
index 0000000..22e90ca
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.cq;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.ContinuousQueryException;
+import org.apache.iotdb.db.exception.ShutdownException;
+import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
+import org.apache.iotdb.db.qp.utils.DatetimeUtils;
+import org.apache.iotdb.db.query.dataset.ShowContinuousQueriesResult;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.service.ServiceType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class ContinuousQueryService implements IService {
+
+  private static long CHECK_INTERVAL =
+      IoTDBDescriptor.getInstance().getConfig().getContinuousQueryMinimumEveryInterval() / 2;
+
+  private static final Logger logger = LoggerFactory.getLogger(ContinuousQueryService.class);
+
+  private final ConcurrentHashMap<String, CreateContinuousQueryPlan> continuousQueryPlans =
+      new ConcurrentHashMap<>();
+
+  private final ConcurrentHashMap<String, Long> nextExecutionTimestamps = new ConcurrentHashMap<>();
+
+  private final ReentrantLock registrationLock = new ReentrantLock();
+
+  private static final ContinuousQueryService INSTANCE = new ContinuousQueryService();
+
+  private ScheduledExecutorService checkThread;
+
+  protected static final ContinuousQueryTaskPoolManager TASK_POOL_MANAGER =
+      ContinuousQueryTaskPoolManager.getInstance();
+
+  private ContinuousQueryService() {}
+
+  public static ContinuousQueryService getInstance() {
+    return INSTANCE;
+  }
+
+  public void acquireRegistrationLock() {
+    registrationLock.lock();
+  }
+
+  public void releaseRegistrationLock() {
+    registrationLock.unlock();
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.CONTINUOUS_QUERY_SERVICE;
+  }
+
+  @Override
+  public void start() {
+
+    for (CreateContinuousQueryPlan plan : continuousQueryPlans.values()) {
+      long durationFromCreation = DatetimeUtils.currentTime() - plan.getCreationTimestamp();
+      long nextExecutionTimestamp =
+          plan.getCreationTimestamp()
+              + plan.getEveryInterval()
+                  * (durationFromCreation / plan.getEveryInterval()
+                      + ((durationFromCreation % plan.getEveryInterval() == 0) ? 0 : 1));
+      nextExecutionTimestamps.put(plan.getContinuousQueryName(), nextExecutionTimestamp);
+    }
+
+    checkThread = Executors.newSingleThreadScheduledExecutor();
+    checkThread.scheduleAtFixedRate(
+        this::checkAndSubmitTasks,
+        0,
+        CHECK_INTERVAL,
+        DatetimeUtils.timestampPrecisionStringToTimeUnit(
+            IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision()));
+
+    logger.info("Continuous query service started.");
+  }
+
+  @Override
+  public void stop() {
+    if (checkThread != null) {
+      checkThread.shutdown();
+      try {
+        checkThread.awaitTermination(600, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        logger.warn("Check thread still doesn't exit after 60s");
+        checkThread.shutdownNow();
+      }
+    }
+  }
+
+  @Override
+  public void shutdown(long milliseconds) throws ShutdownException {
+    stop();
+  }
+
+  private void checkAndSubmitTasks() {
+    long currentTimestamp = DatetimeUtils.currentTime();
+
+    for (CreateContinuousQueryPlan plan : continuousQueryPlans.values()) {
+      long nextExecutionTimestamp = nextExecutionTimestamps.get(plan.getContinuousQueryName());
+      while (currentTimestamp >= nextExecutionTimestamp) {
+        TASK_POOL_MANAGER.submit(new ContinuousQueryTask(plan, nextExecutionTimestamp));
+        nextExecutionTimestamp += plan.getEveryInterval();
+      }
+      nextExecutionTimestamps.replace(plan.getContinuousQueryName(), nextExecutionTimestamp);
+    }
+  }
+
+  public boolean register(CreateContinuousQueryPlan plan, boolean writeLog)
+      throws ContinuousQueryException {
+
+    acquireRegistrationLock();
+
+    try {
+      if (continuousQueryPlans.containsKey(plan.getContinuousQueryName())) {
+        throw new ContinuousQueryException(
+            String.format("Continuous Query [%s] already exists", plan.getContinuousQueryName()));
+      }
+      if (writeLog) {
+        IoTDB.metaManager.createContinuousQuery(plan);
+      }
+      doRegister(plan);
+    } catch (ContinuousQueryException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new ContinuousQueryException(e.getMessage());
+    } finally {
+      releaseRegistrationLock();
+    }
+    return true;
+  }
+
+  private void doRegister(CreateContinuousQueryPlan plan) {
+    continuousQueryPlans.put(plan.getContinuousQueryName(), plan);
+    nextExecutionTimestamps.put(plan.getContinuousQueryName(), plan.getCreationTimestamp());
+  }
+
+  public void deregisterAll() throws ContinuousQueryException {
+    for (String cqName : continuousQueryPlans.keySet()) {
+      deregister(new DropContinuousQueryPlan(cqName));
+    }
+  }
+
+  public boolean deregister(DropContinuousQueryPlan plan) throws ContinuousQueryException {
+
+    acquireRegistrationLock();
+
+    try {
+      if (!continuousQueryPlans.containsKey(plan.getContinuousQueryName())) {
+        throw new ContinuousQueryException(
+            String.format("Continuous Query [%s] does not exist", plan.getContinuousQueryName()));
+      }
+      IoTDB.metaManager.dropContinuousQuery(plan);
+      doDeregister(plan);
+    } catch (ContinuousQueryException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new ContinuousQueryException(e.getMessage());
+    } finally {
+      releaseRegistrationLock();
+    }
+
+    return true;
+  }
+
+  private void doDeregister(DropContinuousQueryPlan plan) {
+    continuousQueryPlans.remove(plan.getContinuousQueryName());
+    nextExecutionTimestamps.remove(plan.getContinuousQueryName());
+  }
+
+  public List<ShowContinuousQueriesResult> getShowContinuousQueriesResultList() {
+
+    List<ShowContinuousQueriesResult> results = new ArrayList<>(continuousQueryPlans.size());
+
+    for (CreateContinuousQueryPlan plan : continuousQueryPlans.values()) {
+
+      results.add(
+          new ShowContinuousQueriesResult(
+              plan.getQuerySql(),
+              plan.getContinuousQueryName(),
+              plan.getTargetPath(),
+              plan.getEveryInterval(),
+              plan.getForInterval()));
+    }
+
+    return results;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
new file mode 100644
index 0000000..656739a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.cq;
+
+import org.apache.iotdb.db.concurrent.WrappedRunnable;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.Planner;
+import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.logical.crud.GroupByClauseComponent;
+import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
+import org.apache.iotdb.db.qp.logical.crud.SelectComponent;
+import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.utils.TypeInferenceUtils;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class ContinuousQueryTask extends WrappedRunnable {
+
+  private static final int FETCH_SIZE = 10000;
+  private static final int BATCH_SIZE = 10000;
+
+  private static final Logger logger = LoggerFactory.getLogger(ContinuousQueryTask.class);
+
+  // To execute the query plan
+  private static PlanExecutor planExecutor;
+
+  static {
+    try {
+      planExecutor = new PlanExecutor();
+    } catch (QueryProcessException e) {
+      logger.error(e.getMessage());
+    }
+  }
+
+  // To save the continuous query info
+  private final CreateContinuousQueryPlan plan;
+  // To transform query operator to query plan
+  private static final Planner planner = new Planner();
+  // Next timestamp to execute a query
+  private long windowEndTimestamp;
+
+  private static final Pattern pattern = Pattern.compile("\\$\\{\\w+}");
+
+  public ContinuousQueryTask(CreateContinuousQueryPlan plan, long windowEndTimestamp) {
+    this.plan = plan;
+    this.windowEndTimestamp = windowEndTimestamp;
+  }
+
+  @Override
+  public void runMayThrow()
+      throws QueryProcessException, StorageEngineException, IOException, InterruptedException,
+          QueryFilterOptimizationException, MetadataException {
+
+    GroupByTimePlan queryPlan = generateQueryPlan();
+
+    if (queryPlan.getDeduplicatedPaths().isEmpty()) {
+      logger.info(plan.getContinuousQueryName() + ": deduplicated paths empty");
+      return;
+    }
+
+    QueryDataSet result = doQuery(queryPlan);
+
+    if (result == null || result.getPaths().size() == 0) {
+      logger.info(plan.getContinuousQueryName() + ": query result empty");
+      return;
+    }
+
+    doInsert(result, queryPlan);
+  }
+
+  public void onRejection() {
+    logger.warn("Continuous Query Task {} rejected", plan.getContinuousQueryName());
+  }
+
+  private GroupByTimePlan generateQueryPlan() throws QueryProcessException {
+
+    QueryOperator queryOperator = plan.getQueryOperator();
+
+    // To handle the time series meta changes in different queries, i.e. creation & deletion,
+    // we need to apply concatenation optimization to SelectComponent before every query.
+    // Since the concatenation optimization will change resultColumns information of
+    // SelectComponent,
+    // we need to save one copy of the original SelectComponent.
+    SelectComponent selectComponentCopy = new SelectComponent(queryOperator.getSelectComponent());
+
+    GroupByTimePlan queryPlan = planner.cqQueryOperatorToGroupByTimePlan(queryOperator, FETCH_SIZE);
+
+    queryOperator.setSelectComponent(selectComponentCopy);
+
+    queryPlan.setStartTime(windowEndTimestamp - plan.getForInterval());
+    queryPlan.setEndTime(windowEndTimestamp);
+
+    return queryPlan;
+  }
+
+  private QueryDataSet doQuery(GroupByTimePlan queryPlan)
+      throws StorageEngineException, QueryFilterOptimizationException, MetadataException,
+          IOException, InterruptedException, QueryProcessException {
+    long queryId =
+        QueryResourceManager.getInstance()
+            .assignQueryId(true, FETCH_SIZE, queryPlan.getDeduplicatedPaths().size());
+
+    try {
+      return planExecutor.processQuery(queryPlan, new QueryContext(queryId));
+    } finally {
+      QueryResourceManager.getInstance().endQuery(queryId);
+    }
+  }
+
+  private void doInsert(QueryDataSet result, GroupByTimePlan queryPlan)
+      throws QueryProcessException, IOException, IllegalPathException {
+
+    int columnSize = result.getDataTypes().size();
+    TSDataType dataType =
+        TypeInferenceUtils.getAggrDataType(
+            queryPlan.getAggregations().get(0), queryPlan.getDataTypes().get(0));
+
+    InsertTabletPlan[] insertTabletPlans = generateInsertTabletPlans(columnSize, result, dataType);
+
+    int batchSize =
+        (int)
+            Math.min(
+                BATCH_SIZE,
+                Math.ceil(
+                    (float) plan.getForInterval()
+                        / ((GroupByClauseComponent)
+                                plan.getQueryOperator().getSpecialClauseComponent())
+                            .getUnit()));
+
+    Object[][] columns = constructColumns(columnSize, batchSize, dataType);
+    long[][] timestamps = new long[columnSize][batchSize];
+    int[] rowNums = new int[columnSize];
+
+    boolean hasNext = true;
+
+    while (hasNext) {
+      int rowNum = 0;
+
+      while (++rowNum <= batchSize) {
+        if (!result.hasNextWithoutConstraint()) {
+          hasNext = false;
+          break;
+        }
+        RowRecord record = result.nextWithoutConstraint();
+        fillColumns(columns, dataType, record, rowNums, timestamps);
+      }
+
+      for (int i = 0; i < columnSize; i++) {
+        if (rowNums[i] > 0) {
+          insertTabletPlans[i].setTimes(timestamps[i]);
+          insertTabletPlans[i].setColumns(columns[i]);
+          insertTabletPlans[i].setRowCount(rowNums[i]);
+          planExecutor.insertTablet(insertTabletPlans[i]);
+        }
+      }
+    }
+  }
+
+  private InsertTabletPlan[] generateInsertTabletPlans(
+      int columnSize, QueryDataSet result, TSDataType dataType) throws IllegalPathException {
+    List<PartialPath> targetPaths = generateTargetPaths(result.getPaths());
+    InsertTabletPlan[] insertTabletPlans = new InsertTabletPlan[columnSize];
+    String[] measurements = new String[] {targetPaths.get(0).getMeasurement()};
+    List<Integer> dataTypes = Collections.singletonList(dataType.ordinal());
+
+    for (int i = 0; i < columnSize; i++) {
+      insertTabletPlans[i] =
+          new InsertTabletPlan(
+              new PartialPath(targetPaths.get(i).getDevice()), measurements, dataTypes);
+    }
+
+    return insertTabletPlans;
+  }
+
+  private Object[][] constructColumns(int columnSize, int fetchSize, TSDataType dataType) {
+    Object[][] columns = new Object[columnSize][1];
+    for (int i = 0; i < columnSize; i++) {
+      switch (dataType) {
+        case DOUBLE:
+          columns[i][0] = new double[fetchSize];
+          break;
+        case INT64:
+          columns[i][0] = new long[fetchSize];
+          break;
+        case INT32:
+          columns[i][0] = new int[fetchSize];
+          break;
+        case FLOAT:
+          columns[i][0] = new float[fetchSize];
+          break;
+        default:
+          break;
+      }
+    }
+    return columns;
+  }
+
+  private void fillColumns(
+      Object[][] columns,
+      TSDataType dataType,
+      RowRecord record,
+      int[] rowNums,
+      long[][] timestamps) {
+    List<Field> fields = record.getFields();
+    long ts = record.getTimestamp();
+
+    for (int i = 0; i < columns.length; i++) {
+      Field field = fields.get(i);
+      if (field != null) {
+        timestamps[i][rowNums[i]] = ts;
+        switch (dataType) {
+          case DOUBLE:
+            ((double[]) columns[i][0])[rowNums[i]] = field.getDoubleV();
+            break;
+          case INT64:
+            ((long[]) columns[i][0])[rowNums[i]] = field.getLongV();
+            break;
+          case INT32:
+            ((int[]) columns[i][0])[rowNums[i]] = field.getIntV();
+            break;
+          case FLOAT:
+            ((float[]) columns[i][0])[rowNums[i]] = field.getFloatV();
+            break;
+          default:
+        }
+
+        rowNums[i]++;
+      }
+    }
+  }
+
+  private List<PartialPath> generateTargetPaths(List<Path> rawPaths) throws IllegalPathException {
+    List<PartialPath> targetPaths = new ArrayList<>(rawPaths.size());
+    for (Path rawPath : rawPaths) {
+      targetPaths.add(new PartialPath(fillTargetPathTemplate((PartialPath) rawPath)));
+    }
+    return targetPaths;
+  }
+
+  private String fillTargetPathTemplate(PartialPath rawPath) {
+    String[] nodes = rawPath.getNodes();
+    int indexOfLeftBracket = nodes[0].indexOf("(");
+    if (indexOfLeftBracket != -1) {
+      nodes[0] = nodes[0].substring(indexOfLeftBracket + 1);
+    }
+    int indexOfRightBracket = nodes[nodes.length - 1].indexOf(")");
+    if (indexOfRightBracket != -1) {
+      nodes[nodes.length - 1] = nodes[nodes.length - 1].substring(0, indexOfRightBracket);
+    }
+    StringBuffer sb = new StringBuffer();
+    Matcher m = pattern.matcher(this.plan.getTargetPath().getFullPath());
+    while (m.find()) {
+      String param = m.group();
+      String value = nodes[Integer.parseInt(param.substring(2, param.length() - 1).trim())];
+      m.appendReplacement(sb, value == null ? "" : value);
+    }
+    m.appendTail(sb);
+    return sb.toString();
+  }
+
+  public CreateContinuousQueryPlan getCreateContinuousQueryPlan() {
+    return plan;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java
new file mode 100644
index 0000000..350aba8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.cq;
+
+import org.apache.iotdb.db.concurrent.IoTThreadFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.flush.pool.AbstractPoolManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class ContinuousQueryTaskPoolManager extends AbstractPoolManager {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(ContinuousQueryTaskPoolManager.class);
+
+  private static final int nThreads =
+      IoTDBDescriptor.getInstance().getConfig().getContinuousQueryThreadNum();
+
+  private ContinuousQueryTaskPoolManager() {
+
+    LOGGER.info("ContinuousQueryTaskPoolManager is initializing, thread number: {}", nThreads);
+
+    pool =
+        new ThreadPoolExecutor(
+            nThreads,
+            nThreads,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<>(
+                IoTDBDescriptor.getInstance().getConfig().getMaxPendingContinuousQueryTasks()),
+            new IoTThreadFactory(ThreadName.CONTINUOUS_QUERY_SERVICE.getName()));
+  }
+
+  public void submit(ContinuousQueryTask task) {
+    try {
+      super.submit(task);
+    } catch (RejectedExecutionException e) {
+      task.onRejection();
+    }
+  }
+
+  @Override
+  public Logger getLogger() {
+    return LOGGER;
+  }
+
+  @Override
+  public String getName() {
+    return "continuous query task";
+  }
+
+  @Override
+  public void start() {
+    if (pool != null) {
+      return;
+    }
+
+    pool =
+        new ThreadPoolExecutor(
+            nThreads,
+            nThreads,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<>(
+                IoTDBDescriptor.getInstance().getConfig().getMaxPendingContinuousQueryTasks()),
+            new IoTThreadFactory(ThreadName.CONTINUOUS_QUERY_SERVICE.getName()));
+  }
+
+  public static ContinuousQueryTaskPoolManager getInstance() {
+    return ContinuousQueryTaskPoolManager.InstanceHolder.INSTANCE;
+  }
+
+  private static class InstanceHolder {
+
+    private InstanceHolder() {
+      // nothing to do
+    }
+
+    private static final ContinuousQueryTaskPoolManager INSTANCE =
+        new ContinuousQueryTaskPoolManager();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MetadataOperationType.java b/server/src/main/java/org/apache/iotdb/db/exception/ContinuousQueryException.java
similarity index 52%
copy from server/src/main/java/org/apache/iotdb/db/metadata/MetadataOperationType.java
copy to server/src/main/java/org/apache/iotdb/db/exception/ContinuousQueryException.java
index deb3272..53108bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MetadataOperationType.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/ContinuousQueryException.java
@@ -16,23 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.metadata;
 
-public class MetadataOperationType {
+package org.apache.iotdb.db.exception;
 
-  private MetadataOperationType() {
-    // allowed to do nothing
-  }
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class ContinuousQueryException extends StorageEngineException {
 
-  public static final String CREATE_TIMESERIES = "0";
-  public static final String DELETE_TIMESERIES = "1";
-  public static final String SET_STORAGE_GROUP = "2";
-  public static final String CREATE_ALIGNED_TIMESERIES = "3";
-  public static final String AUTO_CREATE_DEVICE_MNODE = "4";
-  public static final String SET_TTL = "10";
-  public static final String DELETE_STORAGE_GROUP = "11";
-  public static final String CREATE_INDEX = "31";
-  public static final String DROP_INDEX = "32";
-  public static final String CHANGE_OFFSET = "12";
-  public static final String CHANGE_ALIAS = "13";
+  public ContinuousQueryException(String message) {
+    super(message, TSStatusCode.CONTINUOUS_QUERY_ERROR.getStatusCode());
+    this.isUserException = true;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MLogTxtWriter.java b/server/src/main/java/org/apache/iotdb/db/metadata/MLogTxtWriter.java
index b7affe5..7e91ca2 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MLogTxtWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MLogTxtWriter.java
@@ -20,7 +20,9 @@ package org.apache.iotdb.db.metadata;
 
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.MNodePlan;
 import org.apache.iotdb.db.qp.physical.sys.MeasurementMNodePlan;
 import org.apache.iotdb.db.qp.physical.sys.StorageGroupMNodePlan;
@@ -158,6 +160,29 @@ public class MLogTxtWriter implements AutoCloseable {
     channel.write(buff);
   }
 
+  public void createContinuousQuery(CreateContinuousQueryPlan plan) throws IOException {
+    String buf =
+        String.format(
+                "%s,%s,%s,%s",
+                MetadataOperationType.CREATE_CONTINUOUS_QUERY,
+                plan.getContinuousQueryName(),
+                plan.getQuerySql(),
+                plan.getTargetPath().getFullPath())
+            + LINE_SEPARATOR;
+    channel.write(ByteBuffer.wrap(buf.getBytes()));
+    lineNumber.incrementAndGet();
+  }
+
+  public void dropContinuousQuery(DropContinuousQueryPlan plan) throws IOException {
+
+    String buf =
+        String.format(
+                "%s,%s", MetadataOperationType.DROP_CONTINUOUS_QUERY, plan.getContinuousQueryName())
+            + LINE_SEPARATOR;
+    channel.write(ByteBuffer.wrap(buf.getBytes()));
+    lineNumber.incrementAndGet();
+  }
+
   public void setStorageGroup(String storageGroup) throws IOException {
     String outputStr =
         MetadataOperationType.SET_STORAGE_GROUP + "," + storageGroup + LINE_SEPARATOR;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 3859995..b873649 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -55,9 +55,11 @@ import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan;
 import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan;
 import org.apache.iotdb.db.qp.physical.sys.ChangeTagOffsetPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetUsingDeviceTemplatePlan;
@@ -299,6 +301,7 @@ public class MManager {
 
   private int applyMlog(MLogReader mLogReader) {
     int idx = 0;
+
     while (mLogReader.hasNext()) {
       PhysicalPlan plan = null;
       try {
@@ -313,6 +316,7 @@ public class MManager {
             "Can not operate cmd {} for err:", plan == null ? "" : plan.getOperatorType(), e);
       }
     }
+
     return idx;
   }
 
@@ -406,6 +410,14 @@ public class MManager {
     }
   }
 
+  public void createContinuousQuery(CreateContinuousQueryPlan plan) throws IOException {
+    logWriter.createContinuousQuery(plan);
+  }
+
+  public void dropContinuousQuery(DropContinuousQueryPlan plan) throws IOException {
+    logWriter.dropContinuousQuery(plan);
+  }
+
   public void createTimeseries(CreateTimeSeriesPlan plan) throws MetadataException {
     createTimeseries(plan, -1);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MetadataOperationType.java b/server/src/main/java/org/apache/iotdb/db/metadata/MetadataOperationType.java
index deb3272..a521acf 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MetadataOperationType.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MetadataOperationType.java
@@ -35,4 +35,6 @@ public class MetadataOperationType {
   public static final String DROP_INDEX = "32";
   public static final String CHANGE_OFFSET = "12";
   public static final String CHANGE_ALIAS = "13";
+  public static final String CREATE_CONTINUOUS_QUERY = "14";
+  public static final String DROP_CONTINUOUS_QUERY = "15";
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
index 4391024..b18f9a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
@@ -34,9 +34,11 @@ import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan;
 import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan;
 import org.apache.iotdb.db.qp.physical.sys.ChangeTagOffsetPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.MNodePlan;
 import org.apache.iotdb.db.qp.physical.sys.MeasurementMNodePlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
@@ -135,6 +137,16 @@ public class MLogWriter implements AutoCloseable {
     putLog(deleteTimeSeriesPlan);
   }
 
+  public void createContinuousQuery(CreateContinuousQueryPlan createContinuousQueryPlan)
+      throws IOException {
+    putLog(createContinuousQueryPlan);
+  }
+
+  public void dropContinuousQuery(DropContinuousQueryPlan dropContinuousQueryPlan)
+      throws IOException {
+    putLog(dropContinuousQueryPlan);
+  }
+
   public void setStorageGroup(PartialPath storageGroup) throws IOException {
     SetStorageGroupPlan plan = new SetStorageGroupPlan(storageGroup);
     putLog(plan);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
index 6280136..b12f8ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
 import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
 import org.apache.iotdb.db.qp.logical.crud.WhereComponent;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
 import org.apache.iotdb.db.qp.strategy.LogicalChecker;
 import org.apache.iotdb.db.qp.strategy.LogicalGenerator;
 import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
@@ -64,6 +65,15 @@ public class Planner {
     return new PhysicalGenerator().transformToPhysicalPlan(operator, fetchSize);
   }
 
+  public GroupByTimePlan cqQueryOperatorToGroupByTimePlan(QueryOperator operator, int fetchSize)
+      throws QueryProcessException {
+    // optimize the logical operator (no need to check since the operator has been checked
+    // beforehand)
+    operator = (QueryOperator) logicalOptimize(operator, fetchSize);
+    return (GroupByTimePlan) new PhysicalGenerator().transformToPhysicalPlan(operator, fetchSize);
+  }
+
+  /** convert raw data query to physical plan directly */
   public PhysicalPlan rawDataQueryReqToPhysicalPlan(
       TSRawDataQueryReq rawDataQueryReq, ZoneId zoneId)
       throws IllegalPathException, QueryProcessException {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
index 97123a1..7509bf0 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
@@ -175,6 +175,10 @@ public class SQLConstant {
   public static final int TOK_SHOW_TRIGGERS = 104;
   public static final int TOK_LOCK_INFO = 105;
 
+  public static final int TOK_CONTINUOUS_QUERY_CREATE = 106;
+  public static final int TOK_CONTINUOUS_QUERY_DROP = 107;
+  public static final int TOK_SHOW_CONTINUOUS_QUERIES = 108;
+
   public static final Map<Integer, String> tokenNames = new HashMap<>();
 
   public static String[] getSingleRootArray() {
@@ -239,6 +243,10 @@ public class SQLConstant {
     tokenNames.put(TOK_TRIGGER_START, "TOK_TRIGGER_START");
     tokenNames.put(TOK_TRIGGER_STOP, "TOK_TRIGGER_STOP");
     tokenNames.put(TOK_SHOW_TRIGGERS, "TOK_SHOW_TRIGGERS");
+
+    tokenNames.put(TOK_CONTINUOUS_QUERY_CREATE, "TOK_CONTINUOUS_QUERY_CREATE");
+    tokenNames.put(TOK_CONTINUOUS_QUERY_DROP, "TOK_CONTINUOUS_QUERY_DROP");
+    tokenNames.put(TOK_SHOW_CONTINUOUS_QUERIES, "TOK_SHOW_CONTINUOUS_QUERIES");
   }
 
   public static boolean isReservedPath(PartialPath pathStr) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 4122240..50bfe2a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.auth.entity.Role;
 import org.apache.iotdb.db.auth.entity.User;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.cq.ContinuousQueryService;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
@@ -37,6 +38,7 @@ import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartiti
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
 import org.apache.iotdb.db.exception.BatchProcessException;
+import org.apache.iotdb.db.exception.ContinuousQueryException;
 import org.apache.iotdb.db.exception.QueryIdNotExsitException;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.TriggerExecutionException;
@@ -81,6 +83,7 @@ import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
 import org.apache.iotdb.db.qp.physical.sys.CountPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateFunctionPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
@@ -88,6 +91,7 @@ import org.apache.iotdb.db.qp.physical.sys.CreateTriggerPlan;
 import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.DropFunctionPlan;
 import org.apache.iotdb.db.qp.physical.sys.DropTriggerPlan;
 import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
@@ -115,6 +119,7 @@ import org.apache.iotdb.db.query.control.QueryTimeManager.QueryInfo;
 import org.apache.iotdb.db.query.control.TracingManager;
 import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
 import org.apache.iotdb.db.query.dataset.ListDataSet;
+import org.apache.iotdb.db.query.dataset.ShowContinuousQueriesResult;
 import org.apache.iotdb.db.query.dataset.ShowDevicesDataSet;
 import org.apache.iotdb.db.query.dataset.ShowTimeseriesDataSet;
 import org.apache.iotdb.db.query.dataset.SingleDataSet;
@@ -168,6 +173,11 @@ import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CANCELLED;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_NODES;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_PATHS;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COLUMN;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_EVERY_INTERVAL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_FOR_INTERVAL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_NAME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_QUERY_SQL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_TARGET_PATH;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COUNT;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CREATED_TIME;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DEVICES;
@@ -349,6 +359,10 @@ public class PlanExecutor implements IPlanExecutor {
         return createDeviceTemplate((CreateTemplatePlan) plan);
       case SET_DEVICE_TEMPLATE:
         return setDeviceTemplate((SetDeviceTemplatePlan) plan);
+      case CREATE_CONTINUOUS_QUERY:
+        return operateCreateContinuousQuery((CreateContinuousQueryPlan) plan);
+      case DROP_CONTINUOUS_QUERY:
+        return operateDropContinuousQuery((DropContinuousQueryPlan) plan);
       default:
         throw new UnsupportedOperationException(
             String.format("operation %s is not supported", plan.getOperatorType()));
@@ -478,6 +492,16 @@ public class PlanExecutor implements IPlanExecutor {
     }
   }
 
+  private boolean operateCreateContinuousQuery(CreateContinuousQueryPlan plan)
+      throws ContinuousQueryException {
+    return ContinuousQueryService.getInstance().register(plan, true);
+  }
+
+  private boolean operateDropContinuousQuery(DropContinuousQueryPlan plan)
+      throws ContinuousQueryException {
+    return ContinuousQueryService.getInstance().deregister(plan);
+  }
+
   public static void flushSpecifiedStorageGroups(FlushPlan plan)
       throws StorageGroupNotSetException {
     Map<PartialPath, List<Pair<Long, Boolean>>> storageGroupMap =
@@ -591,6 +615,8 @@ public class PlanExecutor implements IPlanExecutor {
         return processShowFunctions((ShowFunctionsPlan) showPlan);
       case TRIGGERS:
         return processShowTriggers();
+      case CONTINUOUS_QUERY:
+        return processShowContinuousQueries();
       default:
         throw new QueryProcessException(String.format("Unrecognized show plan %s", showPlan));
     }
@@ -945,6 +971,38 @@ public class PlanExecutor implements IPlanExecutor {
     }
   }
 
+  private QueryDataSet processShowContinuousQueries() {
+    ListDataSet listDataSet =
+        new ListDataSet(
+            Arrays.asList(
+                new PartialPath(COLUMN_CONTINUOUS_QUERY_NAME, false),
+                new PartialPath(COLUMN_CONTINUOUS_QUERY_EVERY_INTERVAL, false),
+                new PartialPath(COLUMN_CONTINUOUS_QUERY_FOR_INTERVAL, false),
+                new PartialPath(COLUMN_CONTINUOUS_QUERY_QUERY_SQL, false),
+                new PartialPath(COLUMN_CONTINUOUS_QUERY_TARGET_PATH, false)),
+            Arrays.asList(
+                TSDataType.TEXT,
+                TSDataType.INT64,
+                TSDataType.INT64,
+                TSDataType.TEXT,
+                TSDataType.TEXT));
+
+    List<ShowContinuousQueriesResult> continuousQueriesList =
+        ContinuousQueryService.getInstance().getShowContinuousQueriesResultList();
+
+    for (ShowContinuousQueriesResult result : continuousQueriesList) {
+      RowRecord record = new RowRecord(0);
+      record.addField(Binary.valueOf(result.getContinuousQueryName()), TSDataType.TEXT);
+      record.addField(result.getEveryInterval(), TSDataType.INT64);
+      record.addField(result.getForInterval(), TSDataType.INT64);
+      record.addField(Binary.valueOf(result.getQuerySql()), TSDataType.TEXT);
+      record.addField(Binary.valueOf(result.getTargetPath().getFullPath()), TSDataType.TEXT);
+      listDataSet.putRecord(record);
+    }
+
+    return listDataSet;
+  }
+
   private void appendNativeFunctions(ListDataSet listDataSet, ShowFunctionsPlan showPlan) {
     if (showPlan.showTemporary()) {
       return;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index dc5272a..56eba91 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -165,5 +165,9 @@ public abstract class Operator {
     DELETE_PARTITION,
     LOAD_CONFIGURATION,
     CREATE_SCHEMA_SNAPSHOT,
+
+    CREATE_CONTINUOUS_QUERY,
+    DROP_CONTINUOUS_QUERY,
+    SHOW_CONTINUOUS_QUERIES
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java
index bc3c366..7297460 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java
@@ -47,6 +47,13 @@ public final class SelectComponent {
     this.zoneId = zoneId;
   }
 
+  public SelectComponent(SelectComponent selectComponent) {
+    zoneId = selectComponent.zoneId;
+    hasAggregationFunction = selectComponent.hasAggregationFunction;
+    hasTimeSeriesGeneratingFunction = selectComponent.hasTimeSeriesGeneratingFunction;
+    resultColumns.addAll(selectComponent.resultColumns);
+  }
+
   public ZoneId getZoneId() {
     return zoneId;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreateContinuousQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreateContinuousQueryOperator.java
new file mode 100644
index 0000000..682b7f4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreateContinuousQueryOperator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.logical.sys;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
+import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+
+public class CreateContinuousQueryOperator extends Operator {
+
+  private String querySql;
+  private QueryOperator queryOperator;
+  private String continuousQueryName;
+  private PartialPath targetPath;
+  private long everyInterval;
+  private long forInterval;
+
+  public CreateContinuousQueryOperator(int tokenIntType) {
+    super(tokenIntType);
+    operatorType = OperatorType.CREATE_CONTINUOUS_QUERY;
+  }
+
+  public void setQuerySql(String querySql) {
+    this.querySql = querySql;
+  }
+
+  public String getQuerySql() {
+    return querySql;
+  }
+
+  public void setContinuousQueryName(String continuousQueryName) {
+    this.continuousQueryName = continuousQueryName;
+  }
+
+  public String getContinuousQueryName() {
+    return continuousQueryName;
+  }
+
+  public void setTargetPath(PartialPath targetPath) {
+    this.targetPath = targetPath;
+  }
+
+  public PartialPath getTargetPath() {
+    return targetPath;
+  }
+
+  public void setEveryInterval(long everyInterval) {
+    this.everyInterval = everyInterval;
+  }
+
+  public long getEveryInterval() {
+    return everyInterval;
+  }
+
+  public void setForInterval(long forInterval) {
+    this.forInterval = forInterval;
+  }
+
+  public long getForInterval() {
+    return forInterval;
+  }
+
+  public void setQueryOperator(QueryOperator queryOperator) {
+    this.queryOperator = queryOperator;
+  }
+
+  public QueryOperator getQueryOperator() {
+    return queryOperator;
+  }
+
+  @Override
+  public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
+      throws QueryProcessException {
+    return new CreateContinuousQueryPlan(
+        querySql, continuousQueryName, targetPath, everyInterval, forInterval, queryOperator);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/DropContinuousQueryOperator.java
similarity index 53%
copy from server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowOperator.java
copy to server/src/main/java/org/apache/iotdb/db/qp/logical/sys/DropContinuousQueryOperator.java
index f90ffd0..447f6dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/DropContinuousQueryOperator.java
@@ -15,44 +15,36 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
+
 package org.apache.iotdb.db.qp.logical.sys;
 
-import org.apache.iotdb.db.exception.query.LogicalOperatorException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType;
-import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
 import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
 
-public class ShowOperator extends Operator {
+public class DropContinuousQueryOperator extends Operator {
 
-  public ShowOperator(int tokenIntType) {
-    this(tokenIntType, OperatorType.SHOW);
-  }
+  private String continuousQueryName;
 
-  public ShowOperator(int tokenIntType, OperatorType operatorType) {
+  public DropContinuousQueryOperator(int tokenIntType) {
     super(tokenIntType);
-    this.operatorType = operatorType;
+    operatorType = OperatorType.DROP_CONTINUOUS_QUERY;
+  }
+
+  public void setContinuousQueryName(String continuousQueryName) {
+    this.continuousQueryName = continuousQueryName;
+  }
+
+  public String getContinuousQueryName() {
+    return continuousQueryName;
   }
 
   @Override
   public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
       throws QueryProcessException {
-    switch (tokenIntType) {
-      case SQLConstant.TOK_FLUSH_TASK_INFO:
-        return new ShowPlan(ShowContentType.FLUSH_TASK_INFO);
-      case SQLConstant.TOK_VERSION:
-        return new ShowPlan(ShowContentType.VERSION);
-      case SQLConstant.TOK_QUERY_PROCESSLIST:
-        return new ShowQueryProcesslistPlan(ShowContentType.QUERY_PROCESSLIST);
-      default:
-        throw new LogicalOperatorException(
-            String.format("not supported operator type %s in show operation.", operatorType));
-    }
+    return new DropContinuousQueryPlan(continuousQueryName);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MetadataOperationType.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowContinuousQueriesOperator.java
similarity index 52%
copy from server/src/main/java/org/apache/iotdb/db/metadata/MetadataOperationType.java
copy to server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowContinuousQueriesOperator.java
index deb3272..c5669ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MetadataOperationType.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowContinuousQueriesOperator.java
@@ -16,23 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.metadata;
 
-public class MetadataOperationType {
+package org.apache.iotdb.db.qp.logical.sys;
 
-  private MetadataOperationType() {
-    // allowed to do nothing
-  }
+public class ShowContinuousQueriesOperator extends ShowOperator {
 
-  public static final String CREATE_TIMESERIES = "0";
-  public static final String DELETE_TIMESERIES = "1";
-  public static final String SET_STORAGE_GROUP = "2";
-  public static final String CREATE_ALIGNED_TIMESERIES = "3";
-  public static final String AUTO_CREATE_DEVICE_MNODE = "4";
-  public static final String SET_TTL = "10";
-  public static final String DELETE_STORAGE_GROUP = "11";
-  public static final String CREATE_INDEX = "31";
-  public static final String DROP_INDEX = "32";
-  public static final String CHANGE_OFFSET = "12";
-  public static final String CHANGE_ALIAS = "13";
+  public ShowContinuousQueriesOperator(int tokenIntType) {
+    super(tokenIntType);
+    operatorType = OperatorType.SHOW_CONTINUOUS_QUERIES;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowOperator.java
index f90ffd0..689c13b 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowOperator.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowContinuousQueriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType;
 import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan;
@@ -50,6 +51,8 @@ public class ShowOperator extends Operator {
         return new ShowPlan(ShowContentType.VERSION);
       case SQLConstant.TOK_QUERY_PROCESSLIST:
         return new ShowQueryProcesslistPlan(ShowContentType.QUERY_PROCESSLIST);
+      case SQLConstant.TOK_SHOW_CONTINUOUS_QUERIES:
+        return new ShowContinuousQueriesPlan();
       default:
         throw new LogicalOperatorException(
             String.format("not supported operator type %s in show operation.", operatorType));
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index af6a4a0..97d5daa 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan;
 import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan;
 import org.apache.iotdb.db.qp.physical.sys.ChangeTagOffsetPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateIndexPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
@@ -43,6 +44,7 @@ import org.apache.iotdb.db.qp.physical.sys.CreateTriggerPlan;
 import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.DropIndexPlan;
 import org.apache.iotdb.db.qp.physical.sys.DropTriggerPlan;
 import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
@@ -372,6 +374,12 @@ public abstract class PhysicalPlan {
         case AUTO_CREATE_DEVICE_MNODE:
           plan = new AutoCreateDeviceMNodePlan();
           break;
+        case CREATE_CONTINUOUS_QUERY:
+          plan = new CreateContinuousQueryPlan();
+          break;
+        case DROP_CONTINUOUS_QUERY:
+          plan = new DropContinuousQueryPlan();
+          break;
         default:
           throw new IOException("unrecognized log type " + type);
       }
@@ -427,7 +435,10 @@ public abstract class PhysicalPlan {
     CREATE_TRIGGER,
     DROP_TRIGGER,
     START_TRIGGER,
-    STOP_TRIGGER
+    STOP_TRIGGER,
+    CREATE_CONTINUOUS_QUERY,
+    DROP_CONTINUOUS_QUERY,
+    SHOW_CONTINUOUS_QUERIES
   }
 
   public long getIndex() {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateContinuousQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateContinuousQueryPlan.java
new file mode 100644
index 0000000..a9b7115
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateContinuousQueryPlan.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.physical.sys;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.utils.DatetimeUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class CreateContinuousQueryPlan extends PhysicalPlan {
+
+  private String querySql;
+  private String continuousQueryName;
+  private PartialPath targetPath;
+  private long everyInterval;
+  private long forInterval;
+  private QueryOperator queryOperator;
+  private long creationTimestamp;
+
+  public CreateContinuousQueryPlan() {
+    super(false, Operator.OperatorType.CREATE_CONTINUOUS_QUERY);
+  }
+
+  public CreateContinuousQueryPlan(
+      String querySql,
+      String continuousQueryName,
+      PartialPath targetPath,
+      long everyInterval,
+      long forInterval,
+      QueryOperator queryOperator) {
+    super(false, Operator.OperatorType.CREATE_CONTINUOUS_QUERY);
+    this.querySql = querySql;
+    this.continuousQueryName = continuousQueryName;
+    this.targetPath = targetPath;
+    this.everyInterval = everyInterval;
+    this.forInterval = forInterval;
+    this.queryOperator = queryOperator;
+    this.creationTimestamp = DatetimeUtils.currentTime();
+  }
+
+  public void setQuerySql(String querySql) {
+    this.querySql = querySql;
+  }
+
+  public String getQuerySql() {
+    return querySql;
+  }
+
+  public void setContinuousQueryName(String continuousQueryName) {
+    this.continuousQueryName = continuousQueryName;
+  }
+
+  public String getContinuousQueryName() {
+    return continuousQueryName;
+  }
+
+  public void setTargetPath(PartialPath targetPath) {
+    this.targetPath = targetPath;
+  }
+
+  public PartialPath getTargetPath() {
+    return targetPath;
+  }
+
+  public void setEveryInterval(long everyInterval) {
+    this.everyInterval = everyInterval;
+  }
+
+  public long getEveryInterval() {
+    return everyInterval;
+  }
+
+  public void setForInterval(long forInterval) {
+    this.forInterval = forInterval;
+  }
+
+  public long getForInterval() {
+    return forInterval;
+  }
+
+  public void setQueryOperator(QueryOperator queryOperator) {
+    this.queryOperator = queryOperator;
+  }
+
+  public QueryOperator getQueryOperator() {
+    return queryOperator;
+  }
+
+  public void setCreationTimestamp(long creationTimestamp) {
+    this.creationTimestamp = creationTimestamp;
+  }
+
+  public long getCreationTimestamp() {
+    return creationTimestamp;
+  }
+
+  @Override
+  public List<PartialPath> getPaths() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public void serialize(ByteBuffer buffer) {
+    buffer.put((byte) PhysicalPlanType.CREATE_CONTINUOUS_QUERY.ordinal());
+    ReadWriteIOUtils.write(continuousQueryName, buffer);
+    ReadWriteIOUtils.write(querySql, buffer);
+    ReadWriteIOUtils.write(targetPath.getFullPath(), buffer);
+    buffer.putLong(everyInterval);
+    buffer.putLong(forInterval);
+    buffer.putLong(creationTimestamp);
+  }
+
+  @Override
+  public void deserialize(ByteBuffer buffer) throws IllegalPathException {
+    continuousQueryName = ReadWriteIOUtils.readString(buffer);
+    querySql = ReadWriteIOUtils.readString(buffer);
+    targetPath = new PartialPath(ReadWriteIOUtils.readString(buffer));
+    everyInterval = ReadWriteIOUtils.readLong(buffer);
+    forInterval = ReadWriteIOUtils.readLong(buffer);
+    creationTimestamp = ReadWriteIOUtils.readLong(buffer);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DropContinuousQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DropContinuousQueryPlan.java
new file mode 100644
index 0000000..1faa458
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DropContinuousQueryPlan.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.physical.sys;
+
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DropContinuousQueryPlan extends PhysicalPlan {
+
+  private String continuousQueryName;
+
+  public DropContinuousQueryPlan() {
+    super(false, Operator.OperatorType.DROP_CONTINUOUS_QUERY);
+  }
+
+  public DropContinuousQueryPlan(String continuousQueryName) {
+    super(false, Operator.OperatorType.DROP_CONTINUOUS_QUERY);
+    this.continuousQueryName = continuousQueryName;
+  }
+
+  @Override
+  public List<PartialPath> getPaths() {
+    return new ArrayList<>();
+  }
+
+  public String getContinuousQueryName() {
+    return continuousQueryName;
+  }
+
+  @Override
+  public void serialize(ByteBuffer buffer) {
+    buffer.put((byte) PhysicalPlanType.DROP_CONTINUOUS_QUERY.ordinal());
+    ReadWriteIOUtils.write(continuousQueryName, buffer);
+  }
+
+  @Override
+  public void deserialize(ByteBuffer buffer) {
+    continuousQueryName = ReadWriteIOUtils.readString(buffer);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MetadataOperationType.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowContinuousQueriesPlan.java
similarity index 52%
copy from server/src/main/java/org/apache/iotdb/db/metadata/MetadataOperationType.java
copy to server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowContinuousQueriesPlan.java
index deb3272..7a4b6ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MetadataOperationType.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowContinuousQueriesPlan.java
@@ -16,23 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.metadata;
 
-public class MetadataOperationType {
+package org.apache.iotdb.db.qp.physical.sys;
 
-  private MetadataOperationType() {
-    // allowed to do nothing
-  }
+public class ShowContinuousQueriesPlan extends ShowPlan {
 
-  public static final String CREATE_TIMESERIES = "0";
-  public static final String DELETE_TIMESERIES = "1";
-  public static final String SET_STORAGE_GROUP = "2";
-  public static final String CREATE_ALIGNED_TIMESERIES = "3";
-  public static final String AUTO_CREATE_DEVICE_MNODE = "4";
-  public static final String SET_TTL = "10";
-  public static final String DELETE_STORAGE_GROUP = "11";
-  public static final String CREATE_INDEX = "31";
-  public static final String DROP_INDEX = "32";
-  public static final String CHANGE_OFFSET = "12";
-  public static final String CHANGE_ALIAS = "13";
+  public ShowContinuousQueriesPlan() {
+    super(ShowContentType.CONTINUOUS_QUERY);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
index 60c1537..a66365a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ShowPlan.java
@@ -116,6 +116,7 @@ public class ShowPlan extends PhysicalPlan {
     COUNT_STORAGE_GROUP,
     QUERY_PROCESSLIST,
     TRIGGERS,
-    LOCK_INFO
+    LOCK_INFO,
+    CONTINUOUS_QUERY
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
index 515b9a5..cd2dfe3 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
@@ -54,6 +54,7 @@ import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
 import org.apache.iotdb.db.qp.logical.sys.AuthorOperator.AuthorType;
 import org.apache.iotdb.db.qp.logical.sys.ClearCacheOperator;
 import org.apache.iotdb.db.qp.logical.sys.CountOperator;
+import org.apache.iotdb.db.qp.logical.sys.CreateContinuousQueryOperator;
 import org.apache.iotdb.db.qp.logical.sys.CreateFunctionOperator;
 import org.apache.iotdb.db.qp.logical.sys.CreateIndexOperator;
 import org.apache.iotdb.db.qp.logical.sys.CreateSnapshotOperator;
@@ -63,6 +64,7 @@ import org.apache.iotdb.db.qp.logical.sys.DataAuthOperator;
 import org.apache.iotdb.db.qp.logical.sys.DeletePartitionOperator;
 import org.apache.iotdb.db.qp.logical.sys.DeleteStorageGroupOperator;
 import org.apache.iotdb.db.qp.logical.sys.DeleteTimeSeriesOperator;
+import org.apache.iotdb.db.qp.logical.sys.DropContinuousQueryOperator;
 import org.apache.iotdb.db.qp.logical.sys.DropFunctionOperator;
 import org.apache.iotdb.db.qp.logical.sys.DropIndexOperator;
 import org.apache.iotdb.db.qp.logical.sys.DropTriggerOperator;
@@ -79,6 +81,7 @@ import org.apache.iotdb.db.qp.logical.sys.SetStorageGroupOperator;
 import org.apache.iotdb.db.qp.logical.sys.SetTTLOperator;
 import org.apache.iotdb.db.qp.logical.sys.ShowChildNodesOperator;
 import org.apache.iotdb.db.qp.logical.sys.ShowChildPathsOperator;
+import org.apache.iotdb.db.qp.logical.sys.ShowContinuousQueriesOperator;
 import org.apache.iotdb.db.qp.logical.sys.ShowDevicesOperator;
 import org.apache.iotdb.db.qp.logical.sys.ShowFunctionsOperator;
 import org.apache.iotdb.db.qp.logical.sys.ShowLockInfoOperator;
@@ -107,6 +110,9 @@ import org.apache.iotdb.db.qp.sql.SqlBaseParser.CountDevicesContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.CountNodesContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.CountStorageGroupContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.CountTimeseriesContext;
+import org.apache.iotdb.db.qp.sql.SqlBaseParser.CqGroupByTimeClauseContext;
+import org.apache.iotdb.db.qp.sql.SqlBaseParser.CqSelectIntoClauseContext;
+import org.apache.iotdb.db.qp.sql.SqlBaseParser.CreateContinuousQueryStatementContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.CreateFunctionContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.CreateIndexContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.CreateRoleContext;
@@ -119,6 +125,7 @@ import org.apache.iotdb.db.qp.sql.SqlBaseParser.DeletePartitionContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.DeleteStatementContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.DeleteStorageGroupContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.DeleteTimeseriesContext;
+import org.apache.iotdb.db.qp.sql.SqlBaseParser.DropContinuousQueryStatementContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.DropFunctionContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.DropIndexContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.DropRoleContext;
@@ -179,6 +186,7 @@ import org.apache.iotdb.db.qp.sql.SqlBaseParser.PrivilegesContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.PropertyContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.PropertyValueContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.RemoveFileContext;
+import org.apache.iotdb.db.qp.sql.SqlBaseParser.ResampleClauseContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.ResultColumnContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.RevokeRoleContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.RevokeRoleFromUserContext;
@@ -193,6 +201,7 @@ import org.apache.iotdb.db.qp.sql.SqlBaseParser.SetTTLStatementContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.ShowAllTTLStatementContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.ShowChildNodesContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.ShowChildPathsContext;
+import org.apache.iotdb.db.qp.sql.SqlBaseParser.ShowContinuousQueriesStatementContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.ShowDevicesContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.ShowFlushTaskInfoContext;
 import org.apache.iotdb.db.qp.sql.SqlBaseParser.ShowFunctionsContext;
@@ -260,6 +269,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import static org.apache.iotdb.db.index.common.IndexConstant.PATTERN;
 import static org.apache.iotdb.db.index.common.IndexConstant.THRESHOLD;
@@ -272,6 +283,9 @@ public class IoTDBSqlVisitor extends SqlBaseBaseVisitor<Operator> {
   private static final String DELETE_RANGE_ERROR_MSG =
       "For delete statement, where clause can only contain atomic expressions like : "
           + "time > XXX, time <= XXX, or two atomic expressions connected by 'AND'";
+
+  private static final Pattern cqLevelNodePattern = Pattern.compile("\\$\\{\\w+}");
+
   private ZoneId zoneId;
   private QueryOperator queryOp;
 
@@ -1086,6 +1100,175 @@ public class IoTDBSqlVisitor extends SqlBaseBaseVisitor<Operator> {
         resultColumnContext.AS() == null ? null : resultColumnContext.ID().getText());
   }
 
+  @Override
+  public Operator visitDropContinuousQueryStatement(DropContinuousQueryStatementContext ctx) {
+    DropContinuousQueryOperator dropContinuousQueryOperator =
+        new DropContinuousQueryOperator(SQLConstant.TOK_CONTINUOUS_QUERY_DROP);
+    dropContinuousQueryOperator.setContinuousQueryName(ctx.continuousQueryName.getText());
+    return dropContinuousQueryOperator;
+  }
+
+  @Override
+  public Operator visitShowContinuousQueriesStatement(ShowContinuousQueriesStatementContext ctx) {
+    return new ShowContinuousQueriesOperator(SQLConstant.TOK_SHOW_CONTINUOUS_QUERIES);
+  }
+
+  @Override
+  public Operator visitCreateContinuousQueryStatement(CreateContinuousQueryStatementContext ctx) {
+    CreateContinuousQueryOperator createContinuousQueryOperator =
+        new CreateContinuousQueryOperator(SQLConstant.TOK_CONTINUOUS_QUERY_CREATE);
+
+    createContinuousQueryOperator.setQuerySql(ctx.getText());
+
+    createContinuousQueryOperator.setContinuousQueryName(ctx.continuousQueryName.getText());
+
+    if (ctx.resampleClause() != null) {
+      parseResampleClause(ctx.resampleClause(), createContinuousQueryOperator);
+    }
+
+    parseCqSelectIntoClause(ctx.cqSelectIntoClause(), createContinuousQueryOperator);
+
+    StringBuilder sb = new StringBuilder();
+    sb.append("select ");
+    sb.append(ctx.cqSelectIntoClause().selectClause().getText().substring(6));
+    sb.append(" from ");
+    sb.append(ctx.cqSelectIntoClause().fromClause().prefixPath(0).getText());
+
+    sb.append(" group by ([now() - ");
+    String groupByInterval = ctx.cqSelectIntoClause().cqGroupByTimeClause().DURATION().getText();
+    if (createContinuousQueryOperator.getForInterval() == 0) {
+      sb.append(groupByInterval);
+    } else {
+      List<TerminalNode> durations = ctx.resampleClause().DURATION();
+      sb.append(durations.get(durations.size() - 1).getText());
+    }
+    sb.append(", now()), ");
+    sb.append(groupByInterval);
+    sb.append(")");
+    if (queryOp.isGroupByLevel()) {
+      sb.append(", level = ");
+      sb.append(queryOp.getSpecialClauseComponent().getLevel());
+    }
+    createContinuousQueryOperator.setQuerySql(sb.toString());
+
+    if (createContinuousQueryOperator.getEveryInterval() == 0) {
+      createContinuousQueryOperator.setEveryInterval(
+          ((GroupByClauseComponent) queryOp.getSpecialClauseComponent()).getUnit());
+    }
+
+    if (createContinuousQueryOperator.getEveryInterval()
+        < IoTDBDescriptor.getInstance().getConfig().getContinuousQueryMinimumEveryInterval()) {
+      throw new SQLParserException(
+          "CQ: every interval should not be lower than the minimum value you configured.");
+    }
+
+    if (createContinuousQueryOperator.getForInterval() == 0) {
+      createContinuousQueryOperator.setForInterval(
+          ((GroupByClauseComponent) queryOp.getSpecialClauseComponent()).getUnit());
+    }
+
+    return createContinuousQueryOperator;
+  }
+
+  public void parseResampleClause(
+      ResampleClauseContext ctx, CreateContinuousQueryOperator operator) {
+
+    if (ctx.DURATION().size() == 1) {
+      if (ctx.EVERY() != null) {
+        operator.setEveryInterval(
+            DatetimeUtils.convertDurationStrToLong(ctx.DURATION(0).getText()));
+      } else if (ctx.FOR() != null) {
+        operator.setForInterval(DatetimeUtils.convertDurationStrToLong(ctx.DURATION(0).getText()));
+      }
+    } else if (ctx.DURATION().size() == 2) {
+      operator.setEveryInterval(DatetimeUtils.convertDurationStrToLong(ctx.DURATION(0).getText()));
+      operator.setForInterval(DatetimeUtils.convertDurationStrToLong(ctx.DURATION(1).getText()));
+    }
+  }
+
+  public void parseCqSelectIntoClause(
+      CqSelectIntoClauseContext ctx, CreateContinuousQueryOperator createContinuousQueryOperator) {
+
+    queryOp = new GroupByQueryOperator();
+
+    parseSelectClause(ctx.selectClause());
+    parseFromClause(ctx.fromClause());
+
+    if (queryOp.getSelectComponent().getResultColumns().size() > 1) {
+      throw new SQLParserException("CQ: CQ currently does not support multiple result columns.");
+    }
+
+    if (queryOp.getFromComponent().getPrefixPaths().size() > 1) {
+      throw new SQLParserException("CQ: CQ currently does not support multiple series .");
+    }
+
+    parseCqGroupByTimeClause(ctx.cqGroupByTimeClause());
+
+    int fromLen = queryOp.getFromComponent().getPrefixPaths().get(0).getNodeLength();
+    int queryLevel = queryOp.getSpecialClauseComponent().getLevel();
+    if (queryLevel >= fromLen) {
+      throw new SQLParserException("CQ: Level should not exceed the <from_prefix> length.");
+    }
+
+    PartialPath targetPath = null;
+
+    int trueLevel = queryLevel;
+    if (trueLevel == -1) {
+      trueLevel = fromLen - 1;
+    }
+
+    if (ctx.fullPath() != null) {
+      targetPath = parseFullPath(ctx.fullPath());
+      Matcher m = cqLevelNodePattern.matcher(targetPath.getFullPath());
+      while (m.find()) {
+        String param = m.group();
+        int nodeIndex = 0;
+        try {
+          nodeIndex = Integer.parseInt(param.substring(2, param.length() - 1).trim());
+        } catch (NumberFormatException e) {
+          throw new SQLParserException("CQ: x of ${x} should be an integer.");
+        }
+        if (nodeIndex < 1 || nodeIndex > trueLevel) {
+          throw new SQLParserException(
+              "CQ: x of ${x} should be greater than 0 and equal to or less than <level> or the length of queried path prefix.");
+        }
+      }
+    } else if (ctx.nodeNameWithoutStar() != null) {
+
+      List<String> targetNodes = new ArrayList<>();
+
+      targetNodes.add("root");
+
+      for (int i = 1; i <= trueLevel; i++) {
+        targetNodes.add("${" + i + "}");
+      }
+      targetNodes.add(ctx.nodeNameWithoutStar().getText());
+      targetPath = new PartialPath(targetNodes.toArray(new String[0]));
+    }
+
+    createContinuousQueryOperator.setTargetPath(targetPath);
+    createContinuousQueryOperator.setQueryOperator(queryOp);
+  }
+
+  public void parseCqGroupByTimeClause(CqGroupByTimeClauseContext ctx) {
+
+    GroupByClauseComponent groupByClauseComponent = new GroupByClauseComponent();
+
+    groupByClauseComponent.setUnit(
+        parseTimeUnitOrSlidingStep(ctx.DURATION().getText(), true, groupByClauseComponent));
+
+    groupByClauseComponent.setSlidingStep(groupByClauseComponent.getUnit());
+    groupByClauseComponent.setSlidingStepByMonth(groupByClauseComponent.isIntervalByMonth());
+
+    groupByClauseComponent.setLeftCRightO(true);
+
+    if (ctx.LEVEL() != null && ctx.INT() != null) {
+      groupByClauseComponent.setLevel(Integer.parseInt(ctx.INT().getText()));
+    }
+
+    queryOp.setSpecialClauseComponent(groupByClauseComponent);
+  }
+
   @SuppressWarnings("squid:S3776")
   private Expression parseExpression(ExpressionContext context) {
     // unary
@@ -2125,19 +2308,8 @@ public class IoTDBSqlVisitor extends SqlBaseBaseVisitor<Operator> {
     if (timestampStr == null || timestampStr.trim().equals("")) {
       throw new SQLParserException("input timestamp cannot be empty");
     }
-    long startupNano = IoTDBDescriptor.getInstance().getConfig().getStartUpNanosecond();
     if (timestampStr.equalsIgnoreCase(SQLConstant.NOW_FUNC)) {
-      String timePrecision = IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision();
-      switch (timePrecision) {
-        case "ns":
-          return System.currentTimeMillis() * 1000_000
-              + (System.nanoTime() - startupNano) % 1000_000;
-        case "us":
-          return System.currentTimeMillis() * 1000
-              + (System.nanoTime() - startupNano) / 1000 % 1000;
-        default:
-          return System.currentTimeMillis();
-      }
+      return DatetimeUtils.currentTime();
     }
     try {
       return DatetimeUtils.convertDatetimeStrToLong(timestampStr, zoneId);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/utils/DatetimeUtils.java b/server/src/main/java/org/apache/iotdb/db/qp/utils/DatetimeUtils.java
index 563f7d4..e0b1c26 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/utils/DatetimeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/utils/DatetimeUtils.java
@@ -32,6 +32,7 @@ import java.time.format.DateTimeParseException;
 import java.time.format.SignStyle;
 import java.time.temporal.ChronoField;
 import java.util.Calendar;
+import java.util.concurrent.TimeUnit;
 
 public class DatetimeUtils {
 
@@ -621,6 +622,29 @@ public class DatetimeUtils {
     }
   }
 
+  public static TimeUnit timestampPrecisionStringToTimeUnit(String timestampPrecision) {
+    if (timestampPrecision.equals("us")) {
+      return TimeUnit.MICROSECONDS;
+    } else if (timestampPrecision.equals("ns")) {
+      return TimeUnit.NANOSECONDS;
+    } else {
+      return TimeUnit.MILLISECONDS;
+    }
+  }
+
+  public static long currentTime() {
+    long startupNano = IoTDBDescriptor.getInstance().getConfig().getStartUpNanosecond();
+    String timePrecision = IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision();
+    switch (timePrecision) {
+      case "ns":
+        return System.currentTimeMillis() * 1000_000 + (System.nanoTime() - startupNano) % 1000_000;
+      case "us":
+        return System.currentTimeMillis() * 1000 + (System.nanoTime() - startupNano) / 1000 % 1000;
+      default:
+        return System.currentTimeMillis();
+    }
+  }
+
   public static ZoneOffset toZoneOffset(ZoneId zoneId) {
     return zoneId.getRules().getOffset(Instant.now());
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/ShowContinuousQueriesResult.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/ShowContinuousQueriesResult.java
new file mode 100644
index 0000000..4fb831b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/ShowContinuousQueriesResult.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset;
+
+import org.apache.iotdb.db.metadata.PartialPath;
+
+public class ShowContinuousQueriesResult extends ShowResult {
+
+  private String querySql;
+  private String continuousQueryName;
+  private PartialPath targetPath;
+  private long everyInterval;
+  private long forInterval;
+
+  public ShowContinuousQueriesResult(
+      String querySql,
+      String continuousQueryName,
+      PartialPath targetPath,
+      long everyInterval,
+      long forInterval) {
+    this.querySql = querySql;
+    this.continuousQueryName = continuousQueryName;
+    this.targetPath = targetPath;
+    this.everyInterval = everyInterval;
+    this.forInterval = forInterval;
+  }
+
+  public String getQuerySql() {
+    return querySql;
+  }
+
+  public void setQuerySql(String querySql) {
+    this.querySql = querySql;
+  }
+
+  public String getContinuousQueryName() {
+    return continuousQueryName;
+  }
+
+  public void setContinuousQueryName(String continuousQueryName) {
+    this.continuousQueryName = continuousQueryName;
+  }
+
+  public PartialPath getTargetPath() {
+    return targetPath;
+  }
+
+  public void setTargetPath(PartialPath targetPath) {
+    this.targetPath = targetPath;
+  }
+
+  public long getEveryInterval() {
+    return everyInterval;
+  }
+
+  public void setEveryInterval(long everyInterval) {
+    this.everyInterval = everyInterval;
+  }
+
+  public long getForInterval() {
+    return forInterval;
+  }
+
+  public void setForInterval(long forInterval) {
+    this.forInterval = forInterval;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
index 0c61934..df01f00 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
@@ -19,7 +19,9 @@
 
 package org.apache.iotdb.db.query.dataset.groupby;
 
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.context.QueryContext;
@@ -80,8 +82,13 @@ public class GroupByTimeDataSet extends QueryDataSet {
 
     this.dataTypes = new ArrayList<>();
     this.paths = new ArrayList<>();
-    for (int i = 0; i < finalPaths.size(); i++) {
-      this.dataTypes.add(TSDataType.INT64);
+    for (Map.Entry<String, AggregateResult> entry : finalPaths.entrySet()) {
+      try {
+        this.paths.add(new PartialPath(entry.getKey()));
+      } catch (IllegalPathException e) {
+        logger.error("Query result IllegalPathException occurred: {}.", entry.getKey());
+      }
+      this.dataTypes.add(entry.getValue().getResultDataType());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 51a64ac..1154e3d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBConfigCheck;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.cost.statistic.Measurement;
+import org.apache.iotdb.db.cq.ContinuousQueryService;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor;
 import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
@@ -115,6 +116,7 @@ public class IoTDB implements IoTDBMBean {
     registerManager.register(UDFClassLoaderManager.getInstance());
     registerManager.register(UDFRegistrationService.getInstance());
     registerManager.register(TriggerRegistrationService.getInstance());
+    registerManager.register(ContinuousQueryService.getInstance());
 
     // in cluster mode, RPC service is not enabled.
     if (IoTDBDescriptor.getInstance().getConfig().isEnableRpcService()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index dc652cc..2f61d90 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -53,7 +53,7 @@ public enum ServiceType {
       "Flush ServerService", generateJmxName("org.apache.iotdb.db.engine.pool", "Flush Manager")),
   CLUSTER_MONITOR_SERVICE("Cluster Monitor ServerService", "Cluster Monitor"),
   SYSTEMINFO_SERVICE("MemTable Monitor Service", "MemTable, Monitor"),
-
+  CONTINUOUS_QUERY_SERVICE("Continuous Query Service", "Continuous Query Service"),
   CLUSTER_INFO_SERVICE("Cluster Monitor Service (thrift-based)", "Cluster Monitor-Thrift"),
   ;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/mlog/MLogParser.java b/server/src/main/java/org/apache/iotdb/db/tools/mlog/MLogParser.java
index 6b570ee..bceac1a 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/mlog/MLogParser.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/mlog/MLogParser.java
@@ -25,7 +25,9 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan;
 import org.apache.iotdb.db.qp.physical.sys.ChangeTagOffsetPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.MNodePlan;
 import org.apache.iotdb.db.qp.physical.sys.MeasurementMNodePlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
@@ -202,6 +204,12 @@ public class MLogParser {
           case MNODE:
             mLogTxtWriter.serializeMNode((MNodePlan) plan);
             break;
+          case CREATE_CONTINUOUS_QUERY:
+            mLogTxtWriter.createContinuousQuery((CreateContinuousQueryPlan) plan);
+            break;
+          case DROP_CONTINUOUS_QUERY:
+            mLogTxtWriter.dropContinuousQuery((DropContinuousQueryPlan) plan);
+            break;
           default:
             logger.warn("unknown plan {}", plan);
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
index 7e0c150..3a94159 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
@@ -102,4 +102,27 @@ public class TypeInferenceUtils {
 
     return TSDataType.TEXT;
   }
+
+  public static TSDataType getAggrDataType(String aggrFuncName, TSDataType dataType) {
+    if (aggrFuncName == null) {
+      throw new IllegalArgumentException("AggregateFunction Name must not be null");
+    }
+
+    switch (aggrFuncName.toLowerCase()) {
+      case SQLConstant.MIN_TIME:
+      case SQLConstant.MAX_TIME:
+      case SQLConstant.COUNT:
+        return TSDataType.INT64;
+      case SQLConstant.MIN_VALUE:
+      case SQLConstant.LAST_VALUE:
+      case SQLConstant.FIRST_VALUE:
+      case SQLConstant.MAX_VALUE:
+        return dataType;
+      case SQLConstant.AVG:
+      case SQLConstant.SUM:
+        return TSDataType.DOUBLE;
+      default:
+        throw new IllegalArgumentException("Invalid Aggregation function: " + aggrFuncName);
+    }
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/auth/AuthorityCheckerTest.java b/server/src/test/java/org/apache/iotdb/db/auth/AuthorityCheckerTest.java
index f281796..4a91ee5 100644
--- a/server/src/test/java/org/apache/iotdb/db/auth/AuthorityCheckerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/auth/AuthorityCheckerTest.java
@@ -86,6 +86,11 @@ public class AuthorityCheckerTest {
         user.getName(), nodeName, PrivilegeType.SET_STORAGE_GROUP.ordinal());
     authorizer.grantPrivilegeToUser(
         user.getName(), nodeName, PrivilegeType.CREATE_TIMESERIES.ordinal());
+    authorizer.grantPrivilegeToUser(
+        user.getName(), nodeName, PrivilegeType.CREATE_CONTINUOUS_QUERY.ordinal());
+    authorizer.grantPrivilegeToUser(
+        user.getName(), nodeName, PrivilegeType.DROP_CONTINUOUS_QUERY.ordinal());
+
     Assert.assertTrue(
         AuthorityChecker.check(
             user.getName(),
@@ -232,5 +237,19 @@ public class AuthorityCheckerTest {
             Collections.singletonList(new PartialPath(nodeName)),
             OperatorType.GROUP_BY_FILL,
             user.getName()));
+
+    Assert.assertTrue(
+        AuthorityChecker.check(
+            user.getName(),
+            Collections.singletonList(new PartialPath(nodeName)),
+            OperatorType.CREATE_CONTINUOUS_QUERY,
+            user.getName()));
+
+    Assert.assertTrue(
+        AuthorityChecker.check(
+            user.getName(),
+            Collections.singletonList(new PartialPath(nodeName)),
+            OperatorType.DROP_CONTINUOUS_QUERY,
+            user.getName()));
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java
new file mode 100644
index 0000000..c72c04e
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Notice that, all test begins with "IoTDB" is integration test. All test which will start the
+ * IoTDB server should be defined as integration test.
+ */
+public class IoTDBContinuousQueryIT {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBContinuousQueryIT.class);
+
+  private Statement statement;
+  private Connection connection;
+  private volatile Exception exception = null;
+
+  private final Thread dataGenerator =
+      new Thread() {
+
+        @Override
+        public void run() {
+
+          try (Connection connection =
+                  DriverManager.getConnection(
+                      Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+              Statement statement = connection.createStatement()) {
+
+            do {
+
+              for (String timeSeries : timeSeriesArray) {
+                try {
+                  statement.execute(
+                      String.format(
+                          "insert into %s(timestamp, temperature) values(now(), %.3f)",
+                          timeSeries, 200 * Math.random()));
+                } catch (SQLException throwables) {
+                  LOGGER.error(throwables.getMessage());
+                }
+              }
+            } while (!isInterrupted());
+          } catch (SQLException e) {
+            exception = e;
+          }
+        }
+      };
+
+  private void startDataGenerator() {
+    dataGenerator.start();
+  }
+
+  private void stopDataGenerator() throws InterruptedException {
+    dataGenerator.interrupt();
+    dataGenerator.join();
+    if (exception != null) {
+      fail(exception.getMessage());
+    }
+  }
+
+  String[] timeSeriesArray = {
+    "root.ln.wf01.wt01.ws01",
+    "root.ln.wf01.wt01.ws02",
+    "root.ln.wf01.wt02.ws01",
+    "root.ln.wf01.wt02.ws02",
+    "root.ln.wf02.wt01.ws01",
+    "root.ln.wf02.wt01.ws02",
+    "root.ln.wf02.wt02.ws01",
+    "root.ln.wf02.wt02.ws02"
+  };
+
+  private void createTimeSeries() throws SQLException {
+    for (String timeSeries : timeSeriesArray) {
+      statement.execute(
+          String.format(
+              "create timeseries %s.temperature with datatype=FLOAT,encoding=RLE", timeSeries));
+    }
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.envSetUp();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+    statement = connection.createStatement();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    statement.close();
+    connection.close();
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void testCreateAndDropContinuousQuery() throws Exception {
+
+    createTimeSeries();
+
+    statement.execute(
+        "CREATE CONTINUOUS QUERY cq1 "
+            + "BEGIN SELECT max_value(temperature) INTO temperature_max FROM root.ln.*.*.* "
+            + "GROUP BY time(1s) END");
+
+    statement.execute(
+        "CREATE CONTINUOUS QUERY cq2 "
+            + "BEGIN SELECT count(temperature) INTO temperature_cnt FROM root.ln.wf01.*.* "
+            + " GROUP BY time(1s), level=3 END");
+
+    statement.execute(
+        "CREATE CONTINUOUS QUERY cq3 "
+            + "RESAMPLE EVERY 2s FOR 2s "
+            + "BEGIN SELECT avg(temperature) INTO temperature_avg FROM root.ln.wf01.*.* "
+            + "GROUP BY time(1s), level=2 END");
+
+    statement.execute("DROP CONTINUOUS QUERY cq1");
+    statement.execute("DROP CONTINUOUS QUERY cq2");
+
+    checkContinuousQueries(new String[] {"cq3"});
+
+    EnvironmentUtils.shutdownDaemon();
+
+    EnvironmentUtils.stopDaemon();
+
+    setUp();
+
+    checkContinuousQueries(new String[] {"cq3"});
+
+    try {
+
+      statement.execute(
+          "CREATE CONTINUOUS QUERY cq3 "
+              + "RESAMPLE EVERY 2s FOR 2s "
+              + "BEGIN SELECT avg(temperature) INTO temperature_avg FROM root.ln.wf01.*.* "
+              + "GROUP BY time(1s), level=2 END");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("already exists"));
+    }
+
+    try {
+
+      statement.execute("DROP CONTINUOUS QUERY cq1");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("not exist"));
+    }
+
+    statement.execute(
+        "CREATE CONTINUOUS QUERY cq1 "
+            + "BEGIN SELECT max_value(temperature) INTO temperature_max FROM root.ln.*.*.* "
+            + "GROUP BY time(1s) END");
+
+    statement.execute(
+        "CREATE CONTINUOUS QUERY cq2 "
+            + "BEGIN SELECT count(temperature) INTO temperature_cnt FROM root.ln.wf01.*.* "
+            + " GROUP BY time(1s), level=3 END");
+
+    checkContinuousQueries(new String[] {"cq3", "cq1", "cq2"});
+
+    statement.execute("DROP CONTINUOUS QUERY cq1");
+    statement.execute("DROP CONTINUOUS QUERY cq2");
+    statement.execute("DROP CONTINUOUS QUERY cq3");
+  }
+
+  @Test
+  public void testContinuousQueryResultSeries() throws Exception {
+    createTimeSeries();
+    startDataGenerator();
+
+    Thread.sleep(500);
+
+    statement.execute(
+        "CREATE CONTINUOUS QUERY cq1 "
+            + "BEGIN SELECT count(temperature) INTO temperature_cnt FROM root.ln.*.*.* "
+            + "GROUP BY time(1s), level=2 END");
+
+    Thread.sleep(5500);
+
+    checkTimeSeries(
+        new String[] {
+          "root.ln.wf01.wt01.ws01.temperature",
+          "root.ln.wf01.wt01.ws02.temperature",
+          "root.ln.wf01.wt02.ws01.temperature",
+          "root.ln.wf01.wt02.ws02.temperature",
+          "root.ln.wf02.wt01.ws01.temperature",
+          "root.ln.wf02.wt01.ws02.temperature",
+          "root.ln.wf02.wt02.ws01.temperature",
+          "root.ln.wf02.wt02.ws02.temperature",
+          "root.ln.wf01.temperature_cnt",
+          "root.ln.wf02.temperature_cnt"
+        });
+
+    statement.execute("DROP CONTINUOUS QUERY cq1");
+
+    stopDataGenerator();
+  }
+
+  @Test
+  public void testContinuousQueryResult() throws Exception {
+    createTimeSeries();
+
+    startDataGenerator();
+
+    Thread.sleep(500);
+
+    statement.execute(
+        "CREATE CQ cq1 "
+            + "RESAMPLE EVERY 1s FOR 1s "
+            + "BEGIN SELECT avg(temperature) INTO temperature_avg FROM root.ln.wf01.*.* "
+            + "GROUP BY time(1s), level=2 END");
+
+    long creationTime = System.currentTimeMillis();
+
+    Thread.sleep(5500);
+
+    boolean hasResult = statement.execute("select temperature_avg from root.ln.wf01");
+    Assert.assertTrue(hasResult);
+
+    checkCQExecutionResult(creationTime, 0, 5000, 1000, 1000, 1000, 2);
+
+    statement.execute("DROP CQ cq1");
+
+    stopDataGenerator();
+  }
+
+  @Test
+  public void testContinuousQueryResult2() throws Exception {
+    createTimeSeries();
+
+    startDataGenerator();
+
+    Thread.sleep(500);
+
+    statement.execute(
+        "CREATE CONTINUOUS QUERY cq1 "
+            + "RESAMPLE EVERY 2s "
+            + "BEGIN SELECT avg(temperature) INTO temperature_avg FROM root.ln.wf01.*.* "
+            + "GROUP BY time(1s), level=2 END");
+
+    long creationTime = System.currentTimeMillis();
+
+    Thread.sleep(5500);
+
+    boolean hasResult = statement.execute("select temperature_avg from root.ln.wf01");
+    Assert.assertTrue(hasResult);
+
+    checkCQExecutionResult(creationTime, 0, 5000, 1000, 2000, 1000, 2);
+
+    statement.execute("DROP CQ cq1");
+
+    stopDataGenerator();
+  }
+
+  @Test
+  public void testContinuousQueryResult3() throws Exception {
+    createTimeSeries();
+
+    startDataGenerator();
+
+    Thread.sleep(500);
+
+    statement.execute(
+        "CREATE CONTINUOUS QUERY cq1 "
+            + "BEGIN SELECT avg(temperature) INTO temperature_avg FROM root.ln.wf01.*.* "
+            + "GROUP BY time(1s), level=2 END");
+
+    long creationTime = System.currentTimeMillis();
+
+    Thread.sleep(5500);
+
+    boolean hasResult = statement.execute("select temperature_avg from root.ln.wf01");
+    Assert.assertTrue(hasResult);
+
+    checkCQExecutionResult(creationTime, 0, 5000, 1000, 1000, 1000, 2);
+
+    statement.execute("DROP CQ cq1");
+
+    stopDataGenerator();
+  }
+
+  @Test
+  public void testContinuousQueryResult4() throws Exception {
+
+    statement.execute(
+        "CREATE CONTINUOUS QUERY cq1 "
+            + "BEGIN SELECT avg(temperature) INTO temperature_avg FROM root.ln.wf01.*.* "
+            + "GROUP BY time(1s), level=2 END");
+
+    long creationTime = System.currentTimeMillis();
+
+    Thread.sleep(4500);
+
+    createTimeSeries();
+
+    startDataGenerator();
+
+    Thread.sleep(6000);
+
+    checkCQExecutionResult(creationTime, 5000, 5500, 1000, 1000, 1000, 2);
+
+    statement.execute("DROP CQ cq1");
+
+    stopDataGenerator();
+  }
+
+  private void checkCQExecutionResult(
+      long creationTime,
+      long delay,
+      long duration,
+      long forInterval,
+      long everyInterval,
+      long groupByInterval,
+      int level)
+      throws SQLException {
+    boolean hasResult = statement.execute("select temperature_avg from root.ln.wf01");
+    Assert.assertTrue(hasResult);
+
+    List<Pair<Long, String>> result = generateResult();
+
+    long expectedSize = (duration / everyInterval + 1) * (forInterval / groupByInterval);
+    Assert.assertEquals(expectedSize, result.size());
+
+    long leftMost = result.get(0).left + forInterval;
+
+    for (int i = 0; i < result.size(); i++) {
+      long left = result.get(i).left;
+
+      if (i == 0) {
+        assertTrue(Math.abs(creationTime + delay - forInterval - left) <= 100);
+      } else {
+        long pointNumPerForInterval = forInterval / groupByInterval;
+        Assert.assertEquals(
+            leftMost
+                + (i / pointNumPerForInterval) * everyInterval
+                - (pointNumPerForInterval - i % pointNumPerForInterval) * groupByInterval,
+            left);
+      }
+
+      statement.execute(
+          String.format(
+              "select avg(temperature) from root.ln.wf01.*.* GROUP BY ([%d, %d), %dms), level=%d",
+              left, left + groupByInterval, groupByInterval, level));
+
+      List<Pair<Long, String>> correctAnswer = generateResult();
+      Assert.assertEquals(1, correctAnswer.size());
+
+      Assert.assertEquals(correctAnswer.get(0).right, result.get(i).right);
+    }
+  }
+
+  private List<Pair<Long, String>> generateResult() {
+    List<Pair<Long, String>> result = new ArrayList<>();
+    try (ResultSet resultSet = statement.getResultSet()) {
+      while (resultSet.next()) {
+        String timestamp = resultSet.getString(1);
+        String value = resultSet.getString(2);
+        result.add(new Pair<>(Long.parseLong(timestamp), value));
+      }
+    } catch (SQLException throwables) {
+      LOGGER.error(throwables.getMessage());
+    }
+    return result;
+  }
+
+  private void checkContinuousQueries(String[] continuousQueryArray) throws SQLException {
+    boolean hasResult = statement.execute("show continuous queries");
+    Assert.assertTrue(hasResult);
+
+    List<String> resultList = new ArrayList<>();
+    try (ResultSet resultSet = statement.getResultSet()) {
+      while (resultSet.next()) {
+        String cq = resultSet.getString("cq name");
+        resultList.add(cq);
+      }
+    }
+    Assert.assertEquals(continuousQueryArray.length, resultList.size());
+
+    List<String> collect =
+        resultList.stream()
+            .sorted(Comparator.comparingInt(e -> e.split("\\.").length))
+            .collect(Collectors.toList());
+
+    for (String s : continuousQueryArray) {
+      Assert.assertTrue(collect.contains(s));
+    }
+  }
+
+  private void checkTimeSeries(String[] timeSeriesArray) throws SQLException {
+    boolean hasResult = statement.execute("show timeseries");
+    Assert.assertTrue(hasResult);
+
+    List<String> resultList = new ArrayList<>();
+    try (ResultSet resultSet = statement.getResultSet()) {
+      while (resultSet.next()) {
+        String timeseries = resultSet.getString("timeseries");
+        resultList.add(timeseries);
+      }
+    }
+    Assert.assertEquals(timeSeriesArray.length, resultList.size());
+
+    List<String> collect =
+        resultList.stream()
+            .sorted(Comparator.comparingInt(e -> e.split("\\.").length))
+            .collect(Collectors.toList());
+
+    for (String s : timeSeriesArray) {
+      Assert.assertTrue(collect.contains(s));
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java
index 7910c8c..56d5708 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java
@@ -38,14 +38,17 @@ import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateFunctionPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTriggerPlan;
 import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.DropFunctionPlan;
 import org.apache.iotdb.db.qp.physical.sys.DropTriggerPlan;
 import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
 import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowContinuousQueriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType;
 import org.apache.iotdb.db.qp.physical.sys.ShowTriggersPlan;
@@ -1231,4 +1234,219 @@ public class PhysicalPlanTest {
     Assert.assertTrue(plan.isQuery());
     Assert.assertEquals(ShowContentType.TRIGGERS, plan.getShowContentType());
   }
+
+  @Test
+  public void testCreateCQ1() throws QueryProcessException {
+    String sql =
+        "CREATE CONTINUOUS QUERY cq1 BEGIN SELECT max_value(temperature) INTO temperature_max FROM root.ln.*.*.* GROUP BY time(10s) END";
+
+    CreateContinuousQueryPlan plan =
+        (CreateContinuousQueryPlan) processor.parseSQLToPhysicalPlan(sql);
+    Assert.assertFalse(plan.isQuery());
+    Assert.assertEquals("cq1", plan.getContinuousQueryName());
+    Assert.assertEquals(10000, plan.getEveryInterval());
+    Assert.assertEquals(10000, plan.getForInterval());
+    Assert.assertEquals(
+        "root.${1}.${2}.${3}.${4}.temperature_max", plan.getTargetPath().getFullPath());
+    Assert.assertEquals(
+        "select max_value(temperature) from root.ln.*.*.* group by ([now() - 10s, now()), 10s)",
+        plan.getQuerySql());
+  }
+
+  @Test
+  public void testCreateCQ2() throws QueryProcessException {
+    String sql =
+        "CREATE CONTINUOUS QUERY cq1 BEGIN SELECT max_value(temperature) INTO temperature_max FROM root.ln.*.*.* GROUP BY time(10s), level = 3 END";
+
+    CreateContinuousQueryPlan plan =
+        (CreateContinuousQueryPlan) processor.parseSQLToPhysicalPlan(sql);
+    Assert.assertFalse(plan.isQuery());
+    Assert.assertEquals("cq1", plan.getContinuousQueryName());
+    Assert.assertEquals(10000, plan.getEveryInterval());
+    Assert.assertEquals(10000, plan.getForInterval());
+    Assert.assertEquals("root.${1}.${2}.${3}.temperature_max", plan.getTargetPath().getFullPath());
+    Assert.assertEquals(
+        "select max_value(temperature) from root.ln.*.*.* group by ([now() - 10s, now()), 10s), level = 3",
+        plan.getQuerySql());
+  }
+
+  @Test
+  public void testCreateCQ3() throws QueryProcessException {
+    String sql =
+        "CREATE CONTINUOUS QUERY cq1 RESAMPLE EVERY 20s BEGIN SELECT max_value(temperature) INTO temperature_max FROM root.ln.*.*.* GROUP BY time(10s), level = 3 END";
+
+    CreateContinuousQueryPlan plan =
+        (CreateContinuousQueryPlan) processor.parseSQLToPhysicalPlan(sql);
+    Assert.assertFalse(plan.isQuery());
+    Assert.assertEquals("cq1", plan.getContinuousQueryName());
+    Assert.assertEquals(20000, plan.getEveryInterval());
+    Assert.assertEquals(10000, plan.getForInterval());
+    Assert.assertEquals("root.${1}.${2}.${3}.temperature_max", plan.getTargetPath().getFullPath());
+    Assert.assertEquals(
+        "select max_value(temperature) from root.ln.*.*.* group by ([now() - 10s, now()), 10s), level = 3",
+        plan.getQuerySql());
+  }
+
+  @Test
+  public void testCreateCQ4() throws QueryProcessException {
+    String sql =
+        "CREATE CONTINUOUS QUERY cq1 RESAMPLE EVERY 20s FOR 10s BEGIN SELECT max_value(temperature) INTO temperature_max FROM root.ln.*.*.* GROUP BY time(10s), level = 3 END";
+
+    CreateContinuousQueryPlan plan =
+        (CreateContinuousQueryPlan) processor.parseSQLToPhysicalPlan(sql);
+    Assert.assertFalse(plan.isQuery());
+    Assert.assertEquals("cq1", plan.getContinuousQueryName());
+    Assert.assertEquals(20000, plan.getEveryInterval());
+    Assert.assertEquals(10000, plan.getForInterval());
+    Assert.assertEquals("root.${1}.${2}.${3}.temperature_max", plan.getTargetPath().getFullPath());
+    Assert.assertEquals(
+        "select max_value(temperature) from root.ln.*.*.* group by ([now() - 10s, now()), 10s), level = 3",
+        plan.getQuerySql());
+  }
+
+  @Test
+  public void testCreateCQ5() throws QueryProcessException {
+    String sql =
+        "CREATE CONTINUOUS QUERY cq1 RESAMPLE FOR 20s BEGIN SELECT max_value(temperature) INTO temperature_max FROM root.ln.*.*.* GROUP BY time(10s), level = 3 END";
+
+    CreateContinuousQueryPlan plan =
+        (CreateContinuousQueryPlan) processor.parseSQLToPhysicalPlan(sql);
+    Assert.assertFalse(plan.isQuery());
+    Assert.assertEquals("cq1", plan.getContinuousQueryName());
+    Assert.assertEquals(10000, plan.getEveryInterval());
+    Assert.assertEquals(20000, plan.getForInterval());
+    Assert.assertEquals("root.${1}.${2}.${3}.temperature_max", plan.getTargetPath().getFullPath());
+    Assert.assertEquals(
+        "select max_value(temperature) from root.ln.*.*.* group by ([now() - 20s, now()), 10s), level = 3",
+        plan.getQuerySql());
+  }
+
+  @Test
+  public void testCreateCQ6() throws QueryProcessException {
+
+    String sql =
+        "CREATE CQ cq1 RESAMPLE FOR 20s BEGIN SELECT max_value(temperature) INTO root.${a4}.${3}.temperature_max FROM root.ln.*.*.* GROUP BY time(10s), level = 3 END";
+    try {
+      CreateContinuousQueryPlan plan =
+          (CreateContinuousQueryPlan) processor.parseSQLToPhysicalPlan(sql);
+      fail();
+    } catch (SQLParserException e) {
+      assertEquals("CQ: x of ${x} should be an integer.", e.getMessage());
+    }
+  }
+
+  @Test
+  public void testCreateCQ7() throws QueryProcessException {
+
+    String sql =
+        "CREATE CQ cq1 RESAMPLE FOR 20s BEGIN SELECT max_value(temperature) INTO root.${4}.${3}.temperature_max FROM root.ln.*.*.* GROUP BY time(10s), level = 3 END";
+    try {
+      CreateContinuousQueryPlan plan =
+          (CreateContinuousQueryPlan) processor.parseSQLToPhysicalPlan(sql);
+      fail();
+    } catch (SQLParserException e) {
+      assertEquals(
+          "CQ: x of ${x} should be greater than 0 and equal to or less than <level> or the length of queried path prefix.",
+          e.getMessage());
+    }
+  }
+
+  @Test
+  public void testCreateCQ8() throws QueryProcessException {
+    String sql =
+        "CREATE CONTINUOUS QUERY cq1 BEGIN SELECT max_value(temperature) INTO root.${1}_cq.${2}.${3}.temperature_max FROM root.ln.*.*.* GROUP BY time(10s), level = 3 END";
+
+    CreateContinuousQueryPlan plan =
+        (CreateContinuousQueryPlan) processor.parseSQLToPhysicalPlan(sql);
+    Assert.assertFalse(plan.isQuery());
+    Assert.assertEquals("cq1", plan.getContinuousQueryName());
+    Assert.assertEquals(10000, plan.getEveryInterval());
+    Assert.assertEquals(10000, plan.getForInterval());
+    Assert.assertEquals(
+        "root.${1}_cq.${2}.${3}.temperature_max", plan.getTargetPath().getFullPath());
+    Assert.assertEquals(
+        "select max_value(temperature) from root.ln.*.*.* group by ([now() - 10s, now()), 10s), level = 3",
+        plan.getQuerySql());
+  }
+
+  @Test
+  public void testCreateCQ9() throws QueryProcessException {
+    String sql =
+        "CREATE CONTINUOUS QUERY cq1 BEGIN SELECT max_value(temperature) INTO ${1}.${0}.${2}.${3}.temperature_max FROM root.ln.*.*.* GROUP BY time(10s), level = 3 END";
+    try {
+      CreateContinuousQueryPlan plan =
+          (CreateContinuousQueryPlan) processor.parseSQLToPhysicalPlan(sql);
+      fail();
+    } catch (ParseCancellationException e) {
+      assertTrue(e.getMessage().contains("mismatched input '.' expecting FROM"));
+    }
+  }
+
+  @Test
+  public void testCreateCQ10() throws QueryProcessException {
+
+    String sql =
+        "CREATE CQ cq1 RESAMPLE FOR 20s BEGIN SELECT max_value(temperature) INTO root.${0}.${3}.temperature_max FROM root.ln.*.*.* GROUP BY time(10s), level = 3 END";
+    try {
+      CreateContinuousQueryPlan plan =
+          (CreateContinuousQueryPlan) processor.parseSQLToPhysicalPlan(sql);
+      fail();
+    } catch (SQLParserException e) {
+      assertEquals(
+          "CQ: x of ${x} should be greater than 0 and equal to or less than <level> or the length of queried path prefix.",
+          e.getMessage());
+    }
+  }
+
+  @Test
+  public void testCreateCQ11() throws QueryProcessException {
+
+    String sql =
+        "CREATE CQ cq1 RESAMPLE FOR 20s BEGIN SELECT max_value(temperature), avg(temperature) INTO root.${0}.${3}.temperature_max FROM root.ln.*.*.* GROUP BY time(10s), level = 3 END";
+    try {
+      CreateContinuousQueryPlan plan =
+          (CreateContinuousQueryPlan) processor.parseSQLToPhysicalPlan(sql);
+      fail();
+    } catch (SQLParserException e) {
+      assertEquals("CQ: CQ currently does not support multiple result columns.", e.getMessage());
+    }
+  }
+
+  @Test
+  public void testCreateCQ12() throws QueryProcessException {
+    long minEveryInterval =
+        IoTDBDescriptor.getInstance().getConfig().getContinuousQueryMinimumEveryInterval();
+    long everyInterval = minEveryInterval / 2;
+    String sql =
+        String.format(
+            "CREATE CQ cq1 RESAMPLE EVERY %dms FOR 20s BEGIN SELECT max_value(temperature) INTO temperature_max FROM root.ln.*.*.* GROUP BY time(10s), level = 3 END",
+            everyInterval);
+    try {
+      CreateContinuousQueryPlan plan =
+          (CreateContinuousQueryPlan) processor.parseSQLToPhysicalPlan(sql);
+      fail();
+    } catch (SQLParserException e) {
+      assertEquals(
+          "CQ: every interval should not be lower than the minimum value you configured.",
+          e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDropCQ() throws QueryProcessException {
+    String sql = "DROP CONTINUOUS QUERY cq1";
+
+    DropContinuousQueryPlan plan = (DropContinuousQueryPlan) processor.parseSQLToPhysicalPlan(sql);
+    Assert.assertFalse(plan.isQuery());
+    Assert.assertEquals("cq1", plan.getContinuousQueryName());
+  }
+
+  @Test
+  public void testShowCQs() throws QueryProcessException {
+    String sql = "SHOW CONTINUOUS QUERIES";
+
+    ShowContinuousQueriesPlan plan =
+        (ShowContinuousQueriesPlan) processor.parseSQLToPhysicalPlan(sql);
+    Assert.assertTrue(plan.isQuery());
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index fc29451..fd52e18 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -24,14 +24,13 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.cq.ContinuousQueryService;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
 import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
 import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.TriggerManagementException;
-import org.apache.iotdb.db.exception.UDFRegistrationException;
+import org.apache.iotdb.db.exception.*;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
@@ -92,7 +91,8 @@ public class EnvironmentUtils {
     try {
       UDFRegistrationService.getInstance().deregisterAll();
       TriggerRegistrationService.getInstance().deregisterAll();
-    } catch (UDFRegistrationException | TriggerManagementException e) {
+      ContinuousQueryService.getInstance().deregisterAll();
+    } catch (UDFRegistrationException | TriggerManagementException | ContinuousQueryException e) {
       fail(e.getMessage());
     }
 
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index e0bb90a..5bf62fb 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -50,6 +50,7 @@ public enum TSStatusCode {
   DUPLICATED_TEMPLATE(320),
   UNDEFINED_TEMPLATE(321),
   STORAGE_GROUP_NOT_EXIST(322),
+  CONTINUOUS_QUERY_ERROR(323),
 
   EXECUTE_STATEMENT_ERROR(400),
   SQL_PARSE_ERROR(401),
diff --git a/site/src/main/.vuepress/config.js b/site/src/main/.vuepress/config.js
index 9bff6c5..464e3bc 100644
--- a/site/src/main/.vuepress/config.js
+++ b/site/src/main/.vuepress/config.js
@@ -703,7 +703,8 @@ var config = {
 							['Advanced-Features/UDF-User-Defined-Function','UDF (User Defined Function)'],
 							// The trigger module has not been implemented yet,
 							// so the website should not show users how to use it to avoid misleading.
-							// ['Advanced-Features/Triggers','Trigger']
+							// ['Advanced-Features/Triggers','Trigger'],
+							['Advanced-Features/Continuous-Query','CQ (Continuous Query)'],
 						]
 					},
 					{
@@ -1491,9 +1492,10 @@ var config = {
 						title: '高级功能',
 						children: [
 							['Advanced-Features/UDF-User-Defined-Function','用户定义函数(UDF)'],
-              // The trigger module has not been implemented yet,
-              // so the website should not show users how to use it to avoid misleading.
-              // ['Advanced-Features/Triggers','触发器']
+						  // The trigger module has not been implemented yet,
+						  // so the website should not show users how to use it to avoid misleading.
+						  // ['Advanced-Features/Triggers','触发器'],
+							['Advanced-Features/Continuous-Query','连续查询(CQ)'],
 						]
 					},
 					{