You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/20 12:13:19 UTC

[iotdb] branch IOTDB-4619 updated (58289d6d0a -> c8f61e87a0)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch IOTDB-4619
in repository https://gitbox.apache.org/repos/asf/iotdb.git


    from 58289d6d0a require statementId from sessionId
     add e18af3de47 update construct method of pendingBatch. (#7548)
     add f33130b9fc [IOTDB-4653] add time precision units when setting TTL (#7639)
     add dbef84905a [IOTDB-4649] Fix the problem that constants which have same valueString but different types can not be distinguished. (#7619)
     add 08b6c1514b [IOTDB-4636] Add check to avoid flush empty chunk group (#7635)
     add 2cb85a9286 [IOTDB-4636] Fix IndexOutOfBoundsException when compacting aligned series (#7638)
     add aac98fec73 [IOTDB-3656] mpp load supports modification (#7354)
     add 5b91cecdeb [IOTDB-4680] fix error msg "%s" in load statement  (#7645)
     add a078e88daa [IOTDB-4256] Implement analyzer for SELECT INTO statement (#7588)
     add 79bfe4d123 [IOTDB-3462] Update ratis version to 2.4.0 (#7651)
     add 1b71c37754 [IOTDB-4647] Fix CSV import error when import header name with type (#7636)
     add 993782d2de [IOTDB-4679] Make MPPDataExchangeService use internal_address instead of rpc_address (#7648)
     add a639c09875 Added changes and user docs to rpc sqls (#7633)
     add 40779e48ad [IOTDB-4381] Implement Trigger fire process (#7355)
     add 84aa99226a [IOTDB-4683] Fix REJECT_THERSHOLD init error in SystemInfo (#7649)
     add 75d853837c [IOTDB-4250][IOTDB-4628] Support multiple pipes and update drop semantics (#7581)
     add 127263dd3b [IOTDB-4627]Trigger transfer (#7643)
     add f2ffb494a2 Perfect add/remove confignode process (#7656)
     add 80dca5c7bb [IOTDB-4688] use streaming md5 computing to replace blocking md5
     add b20f49525e [IOTDB-4689] Use seperate channel for heartbeat / appendEntries
     add 88903b3189 Make default timeout parameter in SessionIT from 1s to 60s (#7647)
     add fd8ced4f07 [IOTDB-4690] Add new configs for RatisConsensus (2.4.0)
     add 723ecb42cf [IOTDB-4534] Add IT for Trigger Execution (#7657)
     add 40571eb259 [IOTDB-4698]Implement interface of getLocationOfStatefulTrigger (#7663)
     add d0d2ec30a8 Disable StandaloneMppIT (#7667)
     new 9ee5295fe6 resolve conflicts
     new c8f61e87a0 Add IT for CQ Management and Execute & change into syntax

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/standalone-it-for-mpp.yml        |  22 +-
 .gitignore                                         |   1 +
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  20 +-
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |   1 +
 .../main/java/org/apache/iotdb/tool/ImportCsv.java |  20 +-
 .../resources/conf/iotdb-confignode.properties     |  13 +-
 .../confignode/client/DataNodeRequestType.java     |   1 +
 .../client/async/AsyncDataNodeClientPool.java      |   7 +
 .../client/async/handlers/AsyncClientHandler.java  |   1 +
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |  50 ++
 .../confignode/conf/ConfigNodeDescriptor.java      |  32 +-
 .../confignode/conf/ConfigNodeRemoveCheck.java     |   4 +-
 .../confignode/conf/SystemPropertiesUtils.java     |   2 +-
 .../consensus/request/ConfigPhysicalPlan.java      |  25 +-
 .../consensus/request/ConfigPhysicalPlanType.java  |   7 +-
 .../{GetRoutingPlan.java => GetRegionIdPlan.java}  |  10 +-
 ...ePlan.java => GetTransferringTriggersPlan.java} |   8 +-
 ...rTablePlan.java => GetTriggerLocationPlan.java} |  30 +-
 .../request/read/GetTriggerTablePlan.java          |  22 +-
 .../sync/DropPipePlan.java}                        |  28 +-
 .../write/trigger/UpdateTriggerLocationPlan.java   |  76 +++
 .../trigger/UpdateTriggersOnTransferNodesPlan.java |  75 +++
 .../{GetRoutingResp.java => GetRegionIdResp.java}  |  10 +-
 .../response/TransferringTriggersResp.java         |  26 +-
 ...etRoutingResp.java => TriggerLocationResp.java} |  30 +-
 .../iotdb/confignode/manager/ConfigManager.java    |  61 ++-
 .../iotdb/confignode/manager/ConsensusManager.java |  12 +-
 .../apache/iotdb/confignode/manager/IManager.java  |  13 +-
 .../iotdb/confignode/manager/SyncManager.java      |   8 +-
 .../iotdb/confignode/manager/TriggerManager.java   | 105 ++++-
 .../iotdb/confignode/manager/load/LoadManager.java |   6 +-
 .../manager/load/balancer/RouteBalancer.java       |   6 +-
 .../manager/load/balancer/router/IRouter.java      |   2 +-
 .../load/balancer/router/LazyGreedyRouter.java     |   2 +-
 .../manager/load/balancer/router/LeaderRouter.java |   2 +-
 .../balancer/router/LoadScoreGreedyRouter.java     |   2 +-
 .../iotdb/confignode/manager/node/NodeManager.java |  43 +-
 .../manager/partition/PartitionManager.java        |   8 +-
 .../iotdb/confignode/persistence/TriggerInfo.java  |  67 ++-
 .../persistence/executor/ConfigPlanExecutor.java   |  27 +-
 .../persistence/partition/PartitionInfo.java       |  12 +-
 .../partition/StorageGroupPartitionTable.java      |   6 +-
 .../persistence/sync/ClusterSyncInfo.java          |  43 +-
 .../confignode/procedure/ProcedureExecutor.java    |   2 +-
 .../procedure/env/ConfigNodeProcedureEnv.java      |   3 -
 .../procedure/impl/CreateTriggerProcedure.java     |   5 +-
 .../impl/sync/AbstractOperatePipeProcedure.java    |   6 +-
 .../procedure/impl/sync/CreatePipeProcedure.java   |   3 +-
 .../procedure/impl/sync/DropPipeProcedure.java     |  12 +-
 .../iotdb/confignode/service/ConfigNode.java       |   7 +-
 .../confignode/service/ConfigNodeCommandLine.java  |   5 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  26 +-
 .../request/ConfigPhysicalPlanSerDeTest.java       |  98 +++-
 .../load/balancer/router/LazyGreedyRouterTest.java |   8 +-
 .../load/balancer/router/LeaderRouterTest.java     |   6 +-
 .../balancer/router/LoadScoreGreedyRouterTest.java |   2 +-
 .../confignode1conf/iotdb-confignode.properties    |   3 +-
 .../confignode2conf/iotdb-confignode.properties    |   3 +-
 .../confignode3conf/iotdb-confignode.properties    |   1 +
 consensus/pom.xml                                  |   2 +-
 .../apache/iotdb/consensus/config/RatisConfig.java |  55 ++-
 .../multileader/logdispatcher/LogDispatcher.java   |  18 +-
 .../multileader/logdispatcher/PendingBatch.java    |  11 +-
 .../ratis/FileInfoWithDelayedMd5Computing.java     |  75 ---
 .../iotdb/consensus/ratis/RatisConsensus.java      |   7 +-
 .../iotdb/consensus/ratis/SnapshotStorage.java     |   2 +-
 .../org/apache/iotdb/consensus/ratis/Utils.java    |   8 +
 .../multileader/logdispatcher/SyncStatusTest.java  |  25 +-
 .../apache/iotdb/consensus/ratis/SnapshotTest.java |   3 +
 docs/UserGuide/Cluster/Cluster-Concept.md          |  18 +-
 docs/UserGuide/Cluster/Cluster-Setup.md            |  12 +-
 docs/UserGuide/Delete-Data/TTL.md                  |  16 +-
 .../Maintenance-Tools/Maintenance-Command.md       | 100 +++-
 docs/UserGuide/Process-Data/Continuous-Query.md    |  32 +-
 docs/zh/UserGuide/Cluster/Cluster-Setup.md         |  12 +-
 docs/zh/UserGuide/Delete-Data/TTL.md               |  18 +-
 .../Maintenance-Tools/Maintenance-Command.md       |  99 +++-
 docs/zh/UserGuide/Process-Data/Continuous-Query.md |  18 +-
 .../iotdb/trigger/ClusterAlertingExample.java      |   8 +-
 .../org/apache/iotdb/trigger/LoggerTrigger.java    |  86 ++++
 integration-test/import-control.xml                |   2 +-
 .../trigger/example/TriggerFireTimesCounter.java   |  87 ++++
 .../java/org/apache/iotdb/it/env/MppConfig.java    |  11 +-
 .../org/apache/iotdb/it/utils/TsFileGenerator.java | 232 +++++++++
 .../org/apache/iotdb/itbase/env/BaseConfig.java    |   8 +-
 .../confignode/it/IoTDBClusterPartitionIT.java     |  63 +--
 .../confignode/it/IoTDBConfigNodeSnapshotIT.java   |   4 +-
 .../org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java  | 340 +++++---------
 .../java/org/apache/iotdb/db/it/IoTDBFilterIT.java |  24 +-
 .../org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java   | 459 ++++++++++++++++++
 .../java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java  | 520 +++++++++++++++++++++
 .../org/apache/iotdb/db/it/sync/IoTDBPipeIT.java   |  69 ++-
 .../apache/iotdb/db/it/sync/IoTDBPipeSinkIT.java   |   7 +-
 .../db/it/trigger/IoTDBTriggerExecutionIT.java     | 265 +++++++++++
 .../iotdb/session/it/IoTDBSessionInsertNulIT.java  |   2 +-
 .../IoTDBSessionInsertWithTriggerExecutionIT.java  | 337 +++++++++++++
 .../src/test/resources/TriggerFireTimesCounter.jar | Bin 0 -> 1324 bytes
 ...IoTDBLoadExternalTsFileWithTimePartitionIT.java | 329 -------------
 .../apache/iotdb/commons/conf/IoTDBConstant.java   |   5 +
 ...ception.java => PipeAlreadyExistException.java} |  12 +-
 .../exception/sync/PipeNotExistException.java      |   4 -
 .../sync/PipeSinkAlreadyExistException.java}       |  16 +-
 ...eption.java => PipeSinkBeingUsedException.java} |  12 +-
 .../sync/PipeSinkNotExistException.java}           |  16 +-
 .../commons/executable/ExecutableManager.java      |  11 +
 .../commons/partition/DataPartitionTable.java      |   4 +-
 .../commons/partition/SchemaPartitionTable.java    |   2 +-
 .../commons/partition/SeriesPartitionTable.java    |   2 +-
 .../org/apache/iotdb/commons/path/PartialPath.java |  13 +
 .../iotdb/commons/sync/metadata/SyncMetadata.java  | 122 ++---
 .../commons/sync/persistence/SyncLogReader.java    |  31 +-
 .../apache/iotdb/commons/sync/pipe/PipeStatus.java |   1 -
 .../iotdb/commons/trigger/TriggerInformation.java  |  15 +
 .../apache/iotdb/commons/trigger/TriggerTable.java |  45 ++
 .../commons/sync/metedata/SyncMetadataTest.java    | 189 ++++++++
 .../resources/conf/iotdb-datanode.properties       |  24 +
 .../apache/iotdb/db/client/ConfigNodeClient.java   |  43 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  53 +++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  17 +
 .../db/consensus/DataRegionConsensusImpl.java      |  10 +
 .../db/consensus/SchemaRegionConsensusImpl.java    |  10 +
 .../impl/ReadChunkCompactionPerformer.java         |  21 +-
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |  10 +-
 .../iotdb/db/engine/load/AlignedChunkData.java     |   9 +-
 .../org/apache/iotdb/db/engine/load/ChunkData.java |  14 +-
 .../apache/iotdb/db/engine/load/DeletionData.java  |  72 +++
 .../iotdb/db/engine/load/LoadTsFileManager.java    |  29 +-
 .../iotdb/db/engine/load/NonAlignedChunkData.java  |   1 +
 .../load/{ChunkData.java => TsFileData.java}       |  42 +-
 .../iotdb/db/engine/storagegroup/DataRegion.java   |  11 +-
 .../db/engine/storagegroup/TsFileManager.java      |   5 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |   6 +-
 .../storagegroup/timeindex/DeviceTimeIndex.java    |   5 +-
 .../db/mpp/common/header/ColumnHeaderConstant.java |  37 +-
 .../db/mpp/common/header/DatasetHeaderFactory.java |  10 +-
 .../execution/exchange/MPPDataExchangeService.java |   8 +-
 .../execution/executor/RegionWriteExecutor.java    |  36 +-
 .../apache/iotdb/db/mpp/plan/analyze/Analysis.java |  29 ++
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  | 128 ++++-
 .../iotdb/db/mpp/plan/analyze/SelectIntoUtils.java | 105 +++++
 .../db/mpp/plan/execution/QueryExecution.java      |  17 +-
 .../plan/execution/config/ConfigTaskVisitor.java   |   9 +-
 .../config/executor/ClusterConfigTaskExecutor.java |  30 +-
 .../config/executor/IConfigTaskExecutor.java       |   4 +-
 .../executor/StandaloneConfigTaskExecutor.java     |   4 +-
 .../{GetRegionTask.java => GetRegionIdTask.java}   |  22 +-
 .../mpp/plan/expression/leaf/ConstantOperand.java  |   4 +-
 .../plan/expression/leaf/TimeSeriesOperand.java    |   9 +
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       | 111 ++++-
 .../plan/node/load/LoadSingleTsFileNode.java       |  69 ++-
 .../planner/plan/node/load/LoadTsFileNode.java     |   3 +
 .../plan/node/load/LoadTsFilePieceNode.java        |  35 +-
 .../parameter/DeviceViewIntoPathDescriptor.java    | 208 +++++++++
 .../planner/plan/parameter/IntoPathDescriptor.java | 167 +++++++
 .../plan/scheduler/load/LoadTsFileScheduler.java   |   7 +
 .../db/mpp/plan/statement/StatementVisitor.java    |   6 +-
 .../plan/statement/component/IntoComponent.java    | 237 ++++++++++
 .../db/mpp/plan/statement/component/IntoItem.java  |  73 +++
 .../plan/statement/crud/LoadTsFileStatement.java   |   4 +-
 .../db/mpp/plan/statement/crud/QueryStatement.java |  38 ++
 ...ionStatement.java => GetRegionIdStatement.java} |   6 +-
 .../dag/input/ConstantInputReader.java             |   3 +-
 .../transformation/dag/util/TransformUtils.java    |   3 +-
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |  34 +-
 .../org/apache/iotdb/db/rescon/SystemInfo.java     |   7 +-
 .../java/org/apache/iotdb/db/service/DataNode.java |   4 -
 .../impl/DataNodeInternalRPCServiceImpl.java       |  52 +++
 .../java/org/apache/iotdb/db/sync/SyncService.java | 170 ++++---
 .../db/sync/common/ClusterSyncInfoFetcher.java     |   7 +-
 .../iotdb/db/sync/common/ISyncInfoFetcher.java     |   4 +-
 .../apache/iotdb/db/sync/common/LocalSyncInfo.java |  22 +-
 .../iotdb/db/sync/common/LocalSyncInfoFetcher.java |  11 +-
 .../org/apache/iotdb/db/sync/sender/pipe/Pipe.java |   7 +
 .../iotdb/db/sync/sender/pipe/TsFilePipe.java      |  49 +-
 .../db/sync/transport/client/SenderManager.java    |   2 +
 .../iotdb/db/trigger/executor/TriggerExecutor.java |  41 +-
 .../db/trigger/executor/TriggerFireResult.java     |  57 +++
 .../db/trigger/executor/TriggerFireVisitor.java    | 423 +++++++++++++++++
 .../trigger/service/TriggerManagementService.java  |  85 ++++
 .../apache/iotdb/db/utils/sync/SyncPipeUtil.java   |   2 +-
 .../ReadChunkCompactionPerformerAlignedTest.java   |  79 ++++
 .../db/engine/storagegroup/DataRegionTest.java     |  40 ++
 .../db/mpp/common/schematree/NodeRefTest.java      |  47 ++
 .../iotdb/db/mpp/plan/analyze/AnalyzeTest.java     | 169 +++++++
 .../manager => persistence}/LocalSyncInfoTest.java |  23 +-
 .../recovery => persistence}/SyncLogTest.java      |  36 +-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   2 +
 .../src/main/thrift/confignode.thrift              |  71 ++-
 thrift/src/main/thrift/datanode.thrift             |  30 ++
 .../tsfile/common/constant/TsFileConstant.java     |   4 +
 .../tsfile/write/chunk/AlignedChunkWriterImpl.java |  12 +
 .../apache/iotdb/tsfile/write/record/Tablet.java   | 341 +++++++++++++-
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |   4 +
 .../iotdb/tsfile/write/record/TabletTest.java      |  65 +++
 194 files changed, 7188 insertions(+), 1536 deletions(-)
 rename confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/{GetRoutingPlan.java => GetRegionIdPlan.java} (94%)
 copy confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/{GetTriggerTablePlan.java => GetTransferringTriggersPlan.java} (83%)
 copy confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/{GetTriggerTablePlan.java => GetTriggerLocationPlan.java} (65%)
 copy confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/{read/GetTriggerTablePlan.java => write/sync/DropPipePlan.java} (66%)
 create mode 100644 confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/UpdateTriggerLocationPlan.java
 create mode 100644 confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/trigger/UpdateTriggersOnTransferNodesPlan.java
 copy confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/{GetRoutingResp.java => GetRegionIdResp.java} (83%)
 copy node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/PipeNotExistException.java => confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/TransferringTriggersResp.java (57%)
 rename confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/{GetRoutingResp.java => TriggerLocationResp.java} (59%)
 delete mode 100644 consensus/src/main/java/org/apache/iotdb/consensus/ratis/FileInfoWithDelayedMd5Computing.java
 create mode 100644 example/trigger/src/main/java/org/apache/iotdb/trigger/LoggerTrigger.java
 create mode 100644 integration-test/src/main/java/org/apache/iotdb/db/trigger/example/TriggerFireTimesCounter.java
 create mode 100644 integration-test/src/main/java/org/apache/iotdb/it/utils/TsFileGenerator.java
 create mode 100644 integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java
 create mode 100644 integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java
 create mode 100644 integration-test/src/test/java/org/apache/iotdb/db/it/trigger/IoTDBTriggerExecutionIT.java
 create mode 100644 integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionInsertWithTriggerExecutionIT.java
 create mode 100644 integration-test/src/test/resources/TriggerFireTimesCounter.jar
 delete mode 100644 integration/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsFileWithTimePartitionIT.java
 copy node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/{PipeNotExistException.java => PipeAlreadyExistException.java} (67%)
 copy node-commons/src/main/java/org/apache/iotdb/commons/{sync/pipe/PipeStatus.java => exception/sync/PipeSinkAlreadyExistException.java} (75%)
 copy node-commons/src/main/java/org/apache/iotdb/commons/exception/sync/{PipeNotExistException.java => PipeSinkBeingUsedException.java} (73%)
 copy node-commons/src/main/java/org/apache/iotdb/commons/{sync/pipe/PipeStatus.java => exception/sync/PipeSinkNotExistException.java} (76%)
 create mode 100644 node-commons/src/test/java/org/apache/iotdb/commons/sync/metedata/SyncMetadataTest.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/engine/load/DeletionData.java
 copy server/src/main/java/org/apache/iotdb/db/engine/load/{ChunkData.java => TsFileData.java} (53%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SelectIntoUtils.java
 rename server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/{GetRegionTask.java => GetRegionIdTask.java} (81%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/IntoPathDescriptor.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/IntoComponent.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/IntoItem.java
 rename server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/{GetRegionStatement.java => GetRegionIdStatement.java} (94%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireResult.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
 create mode 100644 server/src/test/java/org/apache/iotdb/db/mpp/common/schematree/NodeRefTest.java
 rename server/src/test/java/org/apache/iotdb/db/sync/{receiver/manager => persistence}/LocalSyncInfoTest.java (83%)
 rename server/src/test/java/org/apache/iotdb/db/sync/{receiver/recovery => persistence}/SyncLogTest.java (83%)
 create mode 100644 tsfile/src/test/java/org/apache/iotdb/tsfile/write/record/TabletTest.java


[iotdb] 01/02: resolve conflicts

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-4619
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9ee5295fe61a69d702b381c5febe6436a3e14a20
Merge: 58289d6d0a d0d2ec30a8
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu Oct 20 14:02:01 2022 +0800

    resolve conflicts

 .github/workflows/standalone-it-for-mpp.yml        |  22 +-
 .gitignore                                         |   1 +
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  20 +-
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |   1 +
 .../main/java/org/apache/iotdb/tool/ImportCsv.java |  20 +-
 .../resources/conf/iotdb-confignode.properties     |  13 +-
 .../confignode/client/DataNodeRequestType.java     |   1 +
 .../client/async/AsyncDataNodeClientPool.java      |   7 +
 .../client/async/handlers/AsyncClientHandler.java  |   1 +
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |  50 +++
 .../confignode/conf/ConfigNodeDescriptor.java      |  32 +-
 .../confignode/conf/ConfigNodeRemoveCheck.java     |   4 +-
 .../confignode/conf/SystemPropertiesUtils.java     |   2 +-
 .../consensus/request/ConfigPhysicalPlan.java      |  25 +-
 .../consensus/request/ConfigPhysicalPlanType.java  |   7 +-
 .../{GetRoutingPlan.java => GetRegionIdPlan.java}  |  10 +-
 ...ePlan.java => GetTransferringTriggersPlan.java} |   8 +-
 ...rTablePlan.java => GetTriggerLocationPlan.java} |  30 +-
 .../request/read/GetTriggerTablePlan.java          |  22 +-
 .../sync/DropPipePlan.java}                        |  28 +-
 .../write/trigger/UpdateTriggerLocationPlan.java   |  76 ++++
 .../trigger/UpdateTriggersOnTransferNodesPlan.java |  75 ++++
 .../{GetRoutingResp.java => GetRegionIdResp.java}  |  10 +-
 .../response/TransferringTriggersResp.java         |  26 +-
 ...etRoutingResp.java => TriggerLocationResp.java} |  30 +-
 .../iotdb/confignode/manager/ConfigManager.java    |  61 ++-
 .../iotdb/confignode/manager/ConsensusManager.java |  12 +-
 .../apache/iotdb/confignode/manager/IManager.java  |  13 +-
 .../iotdb/confignode/manager/SyncManager.java      |   8 +-
 .../iotdb/confignode/manager/TriggerManager.java   | 105 ++++-
 .../iotdb/confignode/manager/load/LoadManager.java |   6 +-
 .../manager/load/balancer/RouteBalancer.java       |   6 +-
 .../manager/load/balancer/router/IRouter.java      |   2 +-
 .../load/balancer/router/LazyGreedyRouter.java     |   2 +-
 .../manager/load/balancer/router/LeaderRouter.java |   2 +-
 .../balancer/router/LoadScoreGreedyRouter.java     |   2 +-
 .../iotdb/confignode/manager/node/NodeManager.java |  43 ++-
 .../manager/partition/PartitionManager.java        |   8 +-
 .../iotdb/confignode/persistence/TriggerInfo.java  |  67 +++-
 .../persistence/executor/ConfigPlanExecutor.java   |  27 +-
 .../persistence/partition/PartitionInfo.java       |  12 +-
 .../partition/StorageGroupPartitionTable.java      |   6 +-
 .../persistence/sync/ClusterSyncInfo.java          |  43 +--
 .../confignode/procedure/ProcedureExecutor.java    |   2 +-
 .../procedure/env/ConfigNodeProcedureEnv.java      |   3 -
 .../procedure/impl/CreateTriggerProcedure.java     |   5 +-
 .../impl/sync/AbstractOperatePipeProcedure.java    |   6 +-
 .../procedure/impl/sync/CreatePipeProcedure.java   |   3 +-
 .../procedure/impl/sync/DropPipeProcedure.java     |  12 +-
 .../iotdb/confignode/service/ConfigNode.java       |   7 +-
 .../confignode/service/ConfigNodeCommandLine.java  |   5 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  26 +-
 .../request/ConfigPhysicalPlanSerDeTest.java       |  98 ++++-
 .../load/balancer/router/LazyGreedyRouterTest.java |   8 +-
 .../load/balancer/router/LeaderRouterTest.java     |   6 +-
 .../balancer/router/LoadScoreGreedyRouterTest.java |   2 +-
 .../confignode1conf/iotdb-confignode.properties    |   3 +-
 .../confignode2conf/iotdb-confignode.properties    |   3 +-
 .../confignode3conf/iotdb-confignode.properties    |   1 +
 consensus/pom.xml                                  |   2 +-
 .../apache/iotdb/consensus/config/RatisConfig.java |  55 ++-
 .../multileader/logdispatcher/LogDispatcher.java   |  18 +-
 .../multileader/logdispatcher/PendingBatch.java    |  11 +-
 .../ratis/FileInfoWithDelayedMd5Computing.java     |  75 ----
 .../iotdb/consensus/ratis/RatisConsensus.java      |   7 +-
 .../iotdb/consensus/ratis/SnapshotStorage.java     |   2 +-
 .../org/apache/iotdb/consensus/ratis/Utils.java    |   8 +
 .../multileader/logdispatcher/SyncStatusTest.java  |  25 +-
 .../apache/iotdb/consensus/ratis/SnapshotTest.java |   3 +
 docs/UserGuide/Cluster/Cluster-Concept.md          |  18 +-
 docs/UserGuide/Cluster/Cluster-Setup.md            |  12 +-
 docs/UserGuide/Delete-Data/TTL.md                  |  16 +-
 .../Maintenance-Tools/Maintenance-Command.md       | 100 ++++-
 docs/zh/UserGuide/Cluster/Cluster-Setup.md         |  12 +-
 docs/zh/UserGuide/Delete-Data/TTL.md               |  18 +-
 .../Maintenance-Tools/Maintenance-Command.md       |  99 ++++-
 .../iotdb/trigger/ClusterAlertingExample.java      |   8 +-
 .../org/apache/iotdb/trigger/LoggerTrigger.java    |  86 +++++
 integration-test/import-control.xml                |   2 +-
 .../trigger/example/TriggerFireTimesCounter.java   |  87 +++++
 .../java/org/apache/iotdb/it/env/MppConfig.java    |  11 +-
 .../org/apache/iotdb/it/utils/TsFileGenerator.java | 232 +++++++++++
 .../org/apache/iotdb/itbase/env/BaseConfig.java    |   8 +-
 .../confignode/it/IoTDBClusterPartitionIT.java     |  63 +--
 .../confignode/it/IoTDBConfigNodeSnapshotIT.java   |   4 +-
 .../org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java  | 340 +++++++----------
 .../java/org/apache/iotdb/db/it/IoTDBFilterIT.java |  24 +-
 .../org/apache/iotdb/db/it/sync/IoTDBPipeIT.java   |  69 +++-
 .../apache/iotdb/db/it/sync/IoTDBPipeSinkIT.java   |   7 +-
 .../db/it/trigger/IoTDBTriggerExecutionIT.java     | 265 +++++++++++++
 .../iotdb/session/it/IoTDBSessionInsertNulIT.java  |   2 +-
 .../IoTDBSessionInsertWithTriggerExecutionIT.java  | 337 ++++++++++++++++
 .../src/test/resources/TriggerFireTimesCounter.jar | Bin 0 -> 1324 bytes
 ...IoTDBLoadExternalTsFileWithTimePartitionIT.java | 329 ----------------
 .../apache/iotdb/commons/conf/IoTDBConstant.java   |   5 +
 ...ception.java => PipeAlreadyExistException.java} |  12 +-
 .../exception/sync/PipeNotExistException.java      |   4 -
 .../sync/PipeSinkAlreadyExistException.java}       |  16 +-
 ...eption.java => PipeSinkBeingUsedException.java} |  12 +-
 .../sync/PipeSinkNotExistException.java}           |  16 +-
 .../commons/executable/ExecutableManager.java      |  11 +
 .../commons/partition/DataPartitionTable.java      |   4 +-
 .../commons/partition/SchemaPartitionTable.java    |   2 +-
 .../commons/partition/SeriesPartitionTable.java    |   2 +-
 .../org/apache/iotdb/commons/path/PartialPath.java |  13 +
 .../iotdb/commons/sync/metadata/SyncMetadata.java  | 122 +++---
 .../commons/sync/persistence/SyncLogReader.java    |  31 +-
 .../apache/iotdb/commons/sync/pipe/PipeStatus.java |   1 -
 .../iotdb/commons/trigger/TriggerInformation.java  |  15 +
 .../apache/iotdb/commons/trigger/TriggerTable.java |  45 +++
 .../commons/sync/metedata/SyncMetadataTest.java    | 189 +++++++++
 .../resources/conf/iotdb-datanode.properties       |  24 ++
 .../apache/iotdb/db/client/ConfigNodeClient.java   |  43 ++-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  53 +++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  17 +
 .../db/consensus/DataRegionConsensusImpl.java      |  10 +
 .../db/consensus/SchemaRegionConsensusImpl.java    |  10 +
 .../impl/ReadChunkCompactionPerformer.java         |  21 +-
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |  10 +-
 .../iotdb/db/engine/load/AlignedChunkData.java     |   9 +-
 .../org/apache/iotdb/db/engine/load/ChunkData.java |  14 +-
 .../apache/iotdb/db/engine/load/DeletionData.java  |  72 ++++
 .../iotdb/db/engine/load/LoadTsFileManager.java    |  29 +-
 .../iotdb/db/engine/load/NonAlignedChunkData.java  |   1 +
 .../load/{ChunkData.java => TsFileData.java}       |  42 +-
 .../iotdb/db/engine/storagegroup/DataRegion.java   |  11 +-
 .../db/engine/storagegroup/TsFileManager.java      |   5 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |   6 +-
 .../storagegroup/timeindex/DeviceTimeIndex.java    |   5 +-
 .../db/mpp/common/header/ColumnHeaderConstant.java |  37 +-
 .../db/mpp/common/header/DatasetHeaderFactory.java |  10 +-
 .../execution/exchange/MPPDataExchangeService.java |   8 +-
 .../execution/executor/RegionWriteExecutor.java    |  36 +-
 .../apache/iotdb/db/mpp/plan/analyze/Analysis.java |  29 ++
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  | 128 ++++++-
 .../iotdb/db/mpp/plan/analyze/SelectIntoUtils.java | 105 +++++
 .../db/mpp/plan/execution/QueryExecution.java      |  17 +-
 .../plan/execution/config/ConfigTaskVisitor.java   |   9 +-
 .../config/executor/ClusterConfigTaskExecutor.java |  30 +-
 .../config/executor/IConfigTaskExecutor.java       |   4 +-
 .../executor/StandaloneConfigTaskExecutor.java     |   4 +-
 .../{GetRegionTask.java => GetRegionIdTask.java}   |  22 +-
 .../mpp/plan/expression/leaf/ConstantOperand.java  |   4 +-
 .../plan/expression/leaf/TimeSeriesOperand.java    |   9 +
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       | 111 +++++-
 .../plan/node/load/LoadSingleTsFileNode.java       |  69 +++-
 .../planner/plan/node/load/LoadTsFileNode.java     |   3 +
 .../plan/node/load/LoadTsFilePieceNode.java        |  35 +-
 .../parameter/DeviceViewIntoPathDescriptor.java    | 208 ++++++++++
 .../planner/plan/parameter/IntoPathDescriptor.java | 167 ++++++++
 .../plan/scheduler/load/LoadTsFileScheduler.java   |   7 +
 .../db/mpp/plan/statement/StatementVisitor.java    |   6 +-
 .../plan/statement/component/IntoComponent.java    | 237 ++++++++++++
 .../db/mpp/plan/statement/component/IntoItem.java  |  73 ++++
 .../plan/statement/crud/LoadTsFileStatement.java   |   4 +-
 .../db/mpp/plan/statement/crud/QueryStatement.java |  38 ++
 ...ionStatement.java => GetRegionIdStatement.java} |   6 +-
 .../dag/input/ConstantInputReader.java             |   3 +-
 .../transformation/dag/util/TransformUtils.java    |   3 +-
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |  34 +-
 .../org/apache/iotdb/db/rescon/SystemInfo.java     |   7 +-
 .../java/org/apache/iotdb/db/service/DataNode.java |   4 -
 .../impl/DataNodeInternalRPCServiceImpl.java       |  52 +++
 .../java/org/apache/iotdb/db/sync/SyncService.java | 170 ++++-----
 .../db/sync/common/ClusterSyncInfoFetcher.java     |   7 +-
 .../iotdb/db/sync/common/ISyncInfoFetcher.java     |   4 +-
 .../apache/iotdb/db/sync/common/LocalSyncInfo.java |  22 +-
 .../iotdb/db/sync/common/LocalSyncInfoFetcher.java |  11 +-
 .../org/apache/iotdb/db/sync/sender/pipe/Pipe.java |   7 +
 .../iotdb/db/sync/sender/pipe/TsFilePipe.java      |  49 ++-
 .../db/sync/transport/client/SenderManager.java    |   2 +
 .../iotdb/db/trigger/executor/TriggerExecutor.java |  41 +-
 .../db/trigger/executor/TriggerFireResult.java     |  57 +++
 .../db/trigger/executor/TriggerFireVisitor.java    | 423 +++++++++++++++++++++
 .../trigger/service/TriggerManagementService.java  |  85 +++++
 .../apache/iotdb/db/utils/sync/SyncPipeUtil.java   |   2 +-
 .../ReadChunkCompactionPerformerAlignedTest.java   |  79 ++++
 .../db/engine/storagegroup/DataRegionTest.java     |  40 ++
 .../db/mpp/common/schematree/NodeRefTest.java      |  47 +++
 .../iotdb/db/mpp/plan/analyze/AnalyzeTest.java     | 169 ++++++++
 .../manager => persistence}/LocalSyncInfoTest.java |  23 +-
 .../recovery => persistence}/SyncLogTest.java      |  36 +-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   2 +
 .../src/main/thrift/confignode.thrift              |  71 ++--
 thrift/src/main/thrift/datanode.thrift             |  30 ++
 .../tsfile/common/constant/TsFileConstant.java     |   4 +
 .../tsfile/write/chunk/AlignedChunkWriterImpl.java |  12 +
 .../apache/iotdb/tsfile/write/record/Tablet.java   | 341 ++++++++++++++++-
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |   4 +
 .../iotdb/tsfile/write/record/TabletTest.java      |  65 ++++
 190 files changed, 6184 insertions(+), 1511 deletions(-)

diff --cc confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index bc7f9f6ce0,f2a57d462a..488cb0298d
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@@ -475,38 -473,38 +475,68 @@@ public class ConfigNodeDescriptor 
              properties.getProperty(
                  "schema_region_ratis_max_sleep_time_ms",
                  String.valueOf(conf.getSchemaRegionRatisMaxSleepTimeMs()))));
+ 
+     conf.setPartitionRegionRatisPreserveLogsWhenPurge(
+         Long.parseLong(
+             properties.getProperty(
+                 "partition_region_ratis_preserve_logs_num_when_purge",
+                 String.valueOf(conf.getPartitionRegionRatisPreserveLogsWhenPurge()))));
+ 
+     conf.setSchemaRegionRatisPreserveLogsWhenPurge(
+         Long.parseLong(
+             properties.getProperty(
+                 "schema_region_ratis_preserve_logs_num_when_purge",
+                 String.valueOf(conf.getSchemaRegionRatisPreserveLogsWhenPurge()))));
+ 
+     conf.setDataRegionRatisPreserveLogsWhenPurge(
+         Long.parseLong(
+             properties.getProperty(
+                 "data_region_ratis_preserve_logs_num_when_purge",
+                 String.valueOf(conf.getDataRegionRatisPreserveLogsWhenPurge()))));
+ 
+     conf.setRatisFirstElectionTimeoutMinMs(
+         Long.parseLong(
+             properties.getProperty(
+                 "ratis_first_election_timeout_min_ms",
+                 String.valueOf(conf.getRatisFirstElectionTimeoutMinMs()))));
+ 
+     conf.setRatisFirstElectionTimeoutMaxMs(
+         Long.parseLong(
+             properties.getProperty(
+                 "ratis_first_election_timeout_max_ms",
+                 String.valueOf(conf.getRatisFirstElectionTimeoutMaxMs()))));
    }
  
 +  private void loadCQConfig(Properties properties) {
 +    int cqSubmitThread =
 +        Integer.parseInt(
 +            properties.getProperty(
 +                "continuous_query_submit_thread", String.valueOf(conf.getCqSubmitThread())));
 +    if (cqSubmitThread <= 0) {
 +      LOGGER.warn(
 +          "continuous_query_submit_thread should be greater than 0, but current value is {}, ignore that and use the default value {}",
 +          cqSubmitThread,
 +          conf.getCqSubmitThread());
 +      cqSubmitThread = conf.getCqSubmitThread();
 +    }
 +    conf.setCqSubmitThread(cqSubmitThread);
 +
 +    long cqMinEveryIntervalInMs =
 +        Long.parseLong(
 +            properties.getProperty(
 +                "continuous_query_min_every_interval_in_ms",
 +                String.valueOf(conf.getCqMinEveryIntervalInMs())));
 +    if (cqMinEveryIntervalInMs <= 0) {
 +      LOGGER.warn(
 +          "continuous_query_min_every_interval_in_ms should be greater than 0, but current value is {}, ignore that and use the default value {}",
 +          cqMinEveryIntervalInMs,
 +          conf.getCqMinEveryIntervalInMs());
 +      cqMinEveryIntervalInMs = conf.getCqMinEveryIntervalInMs();
 +    }
 +
 +    conf.setCqMinEveryIntervalInMs(cqMinEveryIntervalInMs);
 +  }
 +
    /**
     * Check if the current ConfigNode is SeedConfigNode.
     *
diff --cc confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index bf5a438f47,c38ec65dae..b7226dd199
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@@ -304,20 -310,14 +315,28 @@@ public abstract class ConfigPhysicalPla
          case GetSeriesSlotList:
            req = new GetSeriesSlotListPlan();
            break;
+         case UpdateTriggersOnTransferNodes:
+           req = new UpdateTriggersOnTransferNodesPlan();
+           break;
+         case UpdateTriggerLocation:
+           req = new UpdateTriggerLocationPlan();
+           break;
+         case GetTransferringTriggers:
+           req = new GetTransferringTriggersPlan();
 +        case ACTIVE_CQ:
 +          req = new ActiveCQPlan();
 +          break;
 +        case ADD_CQ:
 +          req = new AddCQPlan();
 +          break;
 +        case DROP_CQ:
 +          req = new DropCQPlan();
 +          break;
 +        case UPDATE_CQ_LAST_EXEC_TIME:
 +          req = new UpdateCQLastExecTimePlan();
 +          break;
 +        case SHOW_CQ:
 +          req = new ShowCQPlan();
            break;
          default:
            throw new IOException("unknown PhysicalPlan type: " + typeNum);
diff --cc confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index be8fad06e7,f2492a1766..8bcb95c125
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@@ -89,12 -90,11 +90,16 @@@ public enum ConfigPhysicalPlanType 
    GetTriggerTable,
    UpdateTriggerStateInTable,
    GetTriggerJar,
-   GetRouting,
+   GetRegionId,
    GetSeriesSlotList,
    GetTimeSlotList,
+   UpdateTriggersOnTransferNodes,
+   UpdateTriggerLocation,
+   GetTransferringTriggers,
 -  GetTriggerLocation
++  GetTriggerLocation,
 +  DROP_CQ,
 +  ACTIVE_CQ,
 +  ADD_CQ,
 +  UPDATE_CQ_LAST_EXEC_TIME,
 +  SHOW_CQ
  }
diff --cc confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index eb7973a871,1f76144618..e3625bd3dd
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@@ -94,9 -92,9 +95,10 @@@ import org.apache.iotdb.confignode.rpc.
  import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
  import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
  import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
 +import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
  import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
  import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
+ import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
  import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
  import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
  import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
diff --cc confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 3d7e055e41,a0f3ce3041..5007536503
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@@ -55,9 -53,9 +55,10 @@@ import org.apache.iotdb.confignode.rpc.
  import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
  import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
  import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
 +import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
  import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
  import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
+ import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
  import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
  import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
  import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
diff --cc confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
index 7fc4b3ba8b,e56e4475e1..a30f834235
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
@@@ -46,7 -59,7 +59,8 @@@ import org.slf4j.LoggerFactory
  import java.io.IOException;
  import java.util.Collections;
  import java.util.List;
+ import java.util.Map;
 +import java.util.Optional;
  
  public class TriggerManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(TriggerManager.class);
diff --cc confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 2ef6251d36,7162762a87..875017c8f2
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@@ -829,16 -831,35 +842,40 @@@ public class NodeManager 
            }
          });
  
 -    LOGGER.info(
 -        "get the lowest load DataNode, NodeID: [{}], LoadScore: [{}]", result, lowestLoadScore);
 -    return configManager.getNodeManager().getRegisteredDataNodeLocations().get(result.get());
 +    if (result.get() == -1) {
 +      return Optional.empty();
 +    } else {
 +      LOGGER.info(
 +          "get the lowest load DataNode, NodeID: [{}], LoadScore: [{}]", result, lowestLoadScore);
 +      return Optional.of(
 +          configManager.getNodeManager().getRegisteredDataNodeLocations().get(result.get()));
 +    }
    }
  
+   /**
+    * Get the DataNodeLocation of the lowest load DataNode in input
+    *
+    * @return TDataNodeLocation
+    */
+   public TDataNodeLocation getLowestLoadDataNode(Set<Integer> nodes) {
+     AtomicInteger result = new AtomicInteger();
+     AtomicLong lowestLoadScore = new AtomicLong(Long.MAX_VALUE);
+ 
+     nodes.forEach(
+         nodeID -> {
+           BaseNodeCache cache = nodeCacheMap.get(nodeID);
+           long score = (cache == null) ? Long.MAX_VALUE : cache.getLoadScore();
+           if (score < lowestLoadScore.get()) {
+             result.set(nodeID);
+             lowestLoadScore.set(score);
+           }
+         });
+ 
+     LOGGER.info(
+         "get the lowest load DataNode, NodeID: [{}], LoadScore: [{}]", result, lowestLoadScore);
+     return configManager.getNodeManager().getRegisteredDataNodeLocations().get(result.get());
+   }
+ 
    public boolean isNodeRemoving(int dataNodeId) {
      DataNodeHeartbeatCache cache =
          (DataNodeHeartbeatCache) configManager.getNodeManager().getNodeCacheMap().get(dataNodeId);
diff --cc confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index fd6b13d486,e5815cd695..7af57e0ff8
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@@ -291,15 -294,9 +306,17 @@@ public class ConfigPlanExecutor 
        case PreCreatePipe:
          return syncInfo.preCreatePipe((PreCreatePipePlan) physicalPlan);
        case SetPipeStatus:
-         return syncInfo.operatePipe((SetPipeStatusPlan) physicalPlan);
+         return syncInfo.setPipeStatus((SetPipeStatusPlan) physicalPlan);
+       case DropPipe:
+         return syncInfo.dropPipe((DropPipePlan) physicalPlan);
 +      case ADD_CQ:
 +        return cqInfo.addCQ((AddCQPlan) physicalPlan);
 +      case DROP_CQ:
 +        return cqInfo.dropCQ((DropCQPlan) physicalPlan);
 +      case ACTIVE_CQ:
 +        return cqInfo.activeCQ((ActiveCQPlan) physicalPlan);
 +      case UPDATE_CQ_LAST_EXEC_TIME:
 +        return cqInfo.updateCQLastExecutionTime((UpdateCQLastExecTimePlan) physicalPlan);
        default:
          throw new UnknownPhysicalPlanTypeException(physicalPlan.getType());
      }
diff --cc server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 6913df7cb0,c17ea193c3..20c624583f
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@@ -1964,12 -1974,14 +1975,18 @@@ public class IoTDBDescriptor 
      conf.setSchemaRatisConsensusMaxRetryAttempts(ratisConfig.getSchemaMaxRetryAttempts());
      conf.setSchemaRatisConsensusInitialSleepTimeMs(ratisConfig.getSchemaInitialSleepTime());
      conf.setSchemaRatisConsensusMaxSleepTimeMs(ratisConfig.getSchemaMaxSleepTime());
+ 
+     conf.setDataRatisConsensusPreserveWhenPurge(ratisConfig.getDataPreserveWhenPurge());
+     conf.setSchemaRatisConsensusPreserveWhenPurge(ratisConfig.getSchemaPreserveWhenPurge());
+ 
+     conf.setRatisFirstElectionTimeoutMinMs(ratisConfig.getFirstElectionTimeoutMin());
+     conf.setRatisFirstElectionTimeoutMaxMs(ratisConfig.getFirstElectionTimeoutMax());
    }
  
 +  public void loadCQConfig(TCQConfig cqConfig) {
 +    conf.setCqMinEveryIntervalInMs(cqConfig.getCqMinEveryIntervalInMs());
 +  }
 +
    public void reclaimConsensusMemory() {
      conf.setAllocateMemoryForStorageEngine(
          conf.getAllocateMemoryForStorageEngine() + conf.getAllocateMemoryForConsensus());
diff --cc server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 690070bff6,30a2eeb644..d014d1052a
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@@ -157,11 -140,13 +161,15 @@@ import org.apache.iotdb.mpp.rpc.thrift.
  import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
  import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
  import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
+ import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
  import org.apache.iotdb.rpc.RpcUtils;
  import org.apache.iotdb.rpc.TSStatusCode;
+ import org.apache.iotdb.trigger.api.enums.FailureStrategy;
+ import org.apache.iotdb.trigger.api.enums.TriggerEvent;
  import org.apache.iotdb.tsfile.exception.NotImplementedException;
 +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 +import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+ import org.apache.iotdb.tsfile.write.record.Tablet;
  
  import com.google.common.collect.ImmutableList;
  import org.apache.thrift.TException;
diff --cc thrift-confignode/src/main/thrift/confignode.thrift
index c5c5c5f0ac,4af568e3e0..9069d03c82
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@@ -81,12 -80,14 +81,18 @@@ struct TRatisConfig 
    20: required i64 dataInitialSleepTime
    21: required i64 schemaMaxSleepTime
    22: required i64 dataMaxSleepTime
+ 
+   23: required i64 schemaPreserveWhenPurge
+   24: required i64 dataPreserveWhenPurge
+ 
+   25: required i64 firstElectionTimeoutMin
+   26: required i64 firstElectionTimeoutMax
  }
  
 +struct TCQConfig {
 +  1: required i64 cqMinEveryIntervalInMs
 +}
 +
  struct TDataNodeRemoveReq {
    1: required list<common.TDataNodeLocation> dataNodeLocations
  }


[iotdb] 02/02: Add IT for CQ Management and Execute & change into syntax

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-4619
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c8f61e87a0277dcc1786199241c0e1f96e5a05e2
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu Oct 20 20:13:08 2022 +0800

    Add IT for CQ Management and Execute & change into syntax
---
 docs/UserGuide/Process-Data/Continuous-Query.md    |  32 +-
 docs/zh/UserGuide/Process-Data/Continuous-Query.md |  18 +-
 .../org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java   | 459 ++++++++++++++++++
 .../java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java  | 520 +++++++++++++++++++++
 4 files changed, 1004 insertions(+), 25 deletions(-)

diff --git a/docs/UserGuide/Process-Data/Continuous-Query.md b/docs/UserGuide/Process-Data/Continuous-Query.md
index 3d0fe550c9..b7f727bb0c 100644
--- a/docs/UserGuide/Process-Data/Continuous-Query.md
+++ b/docs/UserGuide/Process-Data/Continuous-Query.md
@@ -121,11 +121,11 @@ Use an `EVERY` interval in the `RESAMPLE` clause to specify the CQ’s execution
 ```sql
 CREATE CONTINUOUS QUERY cq1
 RESAMPLE EVERY 20s
-BEGIN 
-  SELECT max_value(temperature) 
-  INTO root.ln.wf02.wt02.temperature_max, root.ln.wf02.wt01.temperature_max, root.ln.wf01.wt02.temperature_max, root.ln.wf01.wt01.temperature_max
-  FROM root.ln.*.* 
-  GROUP BY time(10s) 
+BEGIN
+SELECT max_value(temperature)
+  INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
+  FROM root.ln.*.*
+  GROUP BY time(10s)
 END
 ```
 
@@ -180,7 +180,7 @@ CREATE CONTINUOUS QUERY cq2
 RESAMPLE RANGE 40s
 BEGIN
   SELECT max_value(temperature)
-  INTO root.ln.wf02.wt02.temperature_max, root.ln.wf02.wt01.temperature_max, root.ln.wf01.wt02.temperature_max, root.ln.wf01.wt01.temperature_max
+  INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
   FROM root.ln.*.*
   GROUP BY time(10s)
 END
@@ -255,7 +255,7 @@ CREATE CONTINUOUS QUERY cq3
 RESAMPLE EVERY 20s RANGE 40s
 BEGIN
   SELECT max_value(temperature)
-  INTO root.ln.wf02.wt02.temperature_max, root.ln.wf02.wt01.temperature_max, root.ln.wf01.wt02.temperature_max, root.ln.wf01.wt01.temperature_max
+  INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
   FROM root.ln.*.*
   GROUP BY time(10s)
   FILL(100.0)
@@ -317,11 +317,11 @@ Use an `EVERY` interval and `RANGE` interval in the RESAMPLE clause to specify t
 ```sql
 CREATE CONTINUOUS QUERY cq4
 RESAMPLE EVERY 20s RANGE 40s, 20s
-BEGIN 
-  SELECT max_value(temperature) 
-  INTO root.ln.wf02.wt02.temperature_max, root.ln.wf02.wt01.temperature_max, root.ln.wf01.wt02.temperature_max, root.ln.wf01.wt01.temperature_max
-  FROM root.ln.*.* 
-  GROUP BY time(10s) 
+BEGIN
+  SELECT max_value(temperature)
+  INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
+  FROM root.ln.*.*
+  GROUP BY time(10s)
   FILL(100.0)
 END
 ```
@@ -376,10 +376,10 @@ Use an `EVERY` interval in the `RESAMPLE` clause to specify the CQ’s execution
 ```sql
 CREATE CONTINUOUS QUERY cq5
 RESAMPLE EVERY 20s
-BEGIN 
+BEGIN
   SELECT temperature + 1
-  INTO  root.precalculated_sg.::(temperature)
-  FROM root.ln.*.* 
+  INTO root.precalculated_sg.::(temperature)
+  FROM root.ln.*.*
   align by device
 END
 ```
@@ -557,7 +557,7 @@ This step performs the nested sub query in from clause of the query above. The f
 CREATE CQ s1_count_cq 
 BEGIN 
     SELECT count(s1)  
-        INTO root.sg_count.d.count_s1
+        INTO root.sg_count.d(count_s1)
         FROM root.sg.d
         GROUP BY(30m)
 END
diff --git a/docs/zh/UserGuide/Process-Data/Continuous-Query.md b/docs/zh/UserGuide/Process-Data/Continuous-Query.md
index 0d22a04576..4527776535 100644
--- a/docs/zh/UserGuide/Process-Data/Continuous-Query.md
+++ b/docs/zh/UserGuide/Process-Data/Continuous-Query.md
@@ -122,11 +122,11 @@ END
 ```sql
 CREATE CONTINUOUS QUERY cq1
 RESAMPLE EVERY 20s
-BEGIN 
-  SELECT max_value(temperature) 
-  INTO root.ln.wf02.wt02.temperature_max, root.ln.wf02.wt01.temperature_max, root.ln.wf01.wt02.temperature_max, root.ln.wf01.wt01.temperature_max
-  FROM root.ln.*.* 
-  GROUP BY time(10s) 
+BEGIN
+  SELECT max_value(temperature)
+  INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
+  FROM root.ln.*.*
+  GROUP BY time(10s)
 END
 ```
 
@@ -182,7 +182,7 @@ CREATE CONTINUOUS QUERY cq2
 RESAMPLE RANGE 40s
 BEGIN
   SELECT max_value(temperature)
-  INTO root.ln.wf02.wt02.temperature_max, root.ln.wf02.wt01.temperature_max, root.ln.wf01.wt02.temperature_max, root.ln.wf01.wt01.temperature_max
+  INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
   FROM root.ln.*.*
   GROUP BY time(10s)
 END
@@ -257,7 +257,7 @@ CREATE CONTINUOUS QUERY cq3
 RESAMPLE EVERY 20s RANGE 40s
 BEGIN
   SELECT max_value(temperature)
-  INTO root.ln.wf02.wt02.temperature_max, root.ln.wf02.wt01.temperature_max, root.ln.wf01.wt02.temperature_max, root.ln.wf01.wt01.temperature_max
+  INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
   FROM root.ln.*.*
   GROUP BY time(10s)
   FILL(100.0)
@@ -322,7 +322,7 @@ CREATE CONTINUOUS QUERY cq4
 RESAMPLE EVERY 20s RANGE 40s, 20s
 BEGIN
   SELECT max_value(temperature)
-  INTO root.ln.wf02.wt02.temperature_max, root.ln.wf02.wt01.temperature_max, root.ln.wf01.wt02.temperature_max, root.ln.wf01.wt01.temperature_max
+  INTO root.ln.wf02.wt02(temperature_max), root.ln.wf02.wt01(temperature_max), root.ln.wf01.wt02(temperature_max), root.ln.wf01.wt01(temperature_max)
   FROM root.ln.*.*
   GROUP BY time(10s)
   FILL(100.0)
@@ -381,7 +381,7 @@ CREATE CONTINUOUS QUERY cq5
 RESAMPLE EVERY 20s
 BEGIN
   SELECT temperature + 1
-  INTO  root.precalculated_sg.::(temperature)
+  INTO root.precalculated_sg.::(temperature)
   FROM root.ln.*.*
   align by device
 END
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java
new file mode 100644
index 0000000000..83bb55c157
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQExecIT.java
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.it.cq;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.iotdb.itbase.constant.TestConstant.TIMESTAMP_STR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category(ClusterIT.class)
+@Ignore
+public class IoTDBCQExecIT {
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv().initBeforeClass();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanAfterClass();
+  }
+
+  @Test
+  public void testCQExecution1() {
+    String insertTemplate =
+        "INSERT INTO root.sg.d1(time, s1) VALUES (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d)";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      long now = System.currentTimeMillis();
+      long firstExecutionTime = now + 10_000;
+      long startTime = firstExecutionTime - 3_000;
+
+      statement.execute("create timeseries root.sg.d1.s1 WITH DATATYPE=INT64");
+      statement.execute("create timeseries root.sg.d1.s1_max WITH DATATYPE=INT64");
+
+      statement.execute(
+          String.format(
+              insertTemplate,
+              startTime,
+              1,
+              startTime + 500,
+              2,
+              startTime + 1_000,
+              3,
+              startTime + 1_500,
+              4,
+              startTime + 2_000,
+              5,
+              startTime + 2_500,
+              6,
+              startTime + 3_000,
+              7,
+              startTime + 3_500,
+              8,
+              startTime + 4_000,
+              9,
+              startTime + 4_500,
+              10));
+
+      statement.execute(
+          "CREATE CONTINUOUS QUERY cq1\n"
+              + "RESAMPLE EVERY 2s\n"
+              + "BBOUNDARY "
+              + firstExecutionTime
+              + "BEGIN \n"
+              + "  SELECT max_value(s1) \n"
+              + "  INTO root.sg.d1(s1_max)\n"
+              + "  FROM root.sg.d1\n"
+              + "  GROUP BY time(1s) \n"
+              + "END");
+
+      long targetTime = firstExecutionTime + 5_000;
+
+      while (System.currentTimeMillis() - targetTime < 0) {
+        TimeUnit.SECONDS.sleep(1);
+      }
+
+      long[] expectedTime = {
+        startTime + 1_000, startTime + 2_000, startTime + 3_000, startTime + 4_000
+      };
+      long[] expectedValue = {4, 6, 8, 10};
+
+      try (ResultSet resultSet = statement.executeQuery("select s1_max from root.sg.d1")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          assertEquals(expectedTime[cnt], resultSet.getLong(TIMESTAMP_STR));
+          assertEquals(expectedValue[cnt], resultSet.getLong("root.sg.d1.s1_max"));
+          cnt++;
+        }
+        assertEquals(expectedTime.length, cnt);
+      }
+
+      statement.execute("DROP CQ cq1");
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testCQExecution2() {
+    String insertTemplate =
+        "INSERT INTO root.sg.d2(time, s1) VALUES (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d)";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      long now = System.currentTimeMillis();
+      long firstExecutionTime = now + 10_000;
+      long startTime = firstExecutionTime - 4_000;
+
+      statement.execute("create timeseries root.sg.d2.s1 WITH DATATYPE=INT64");
+      statement.execute("create timeseries root.sg.d2.s1_max WITH DATATYPE=INT64");
+
+      statement.execute(
+          String.format(
+              insertTemplate,
+              startTime,
+              1,
+              startTime + 500,
+              2,
+              startTime + 1_000,
+              3,
+              startTime + 1_500,
+              4,
+              startTime + 2_000,
+              5,
+              startTime + 2_500,
+              6,
+              startTime + 3_000,
+              7,
+              startTime + 3_500,
+              8,
+              startTime + 4_000,
+              9,
+              startTime + 4_500,
+              10));
+
+      statement.execute(
+          "CREATE CONTINUOUS QUERY cq2\n"
+              + "RESAMPLE RANGE 4s\n"
+              + "BBOUNDARY "
+              + firstExecutionTime
+              + "BEGIN \n"
+              + "  SELECT max_value(s1) \n"
+              + "  INTO root.sg.d2(s1_max)\n"
+              + "  FROM root.sg.d2\n"
+              + "  GROUP BY time(1s) \n"
+              + "END");
+
+      long targetTime = firstExecutionTime + 5_000;
+
+      while (System.currentTimeMillis() - targetTime < 0) {
+        TimeUnit.SECONDS.sleep(1);
+      }
+
+      long[] expectedTime = {
+        startTime, startTime + 1_000, startTime + 2_000, startTime + 3_000, startTime + 4_000
+      };
+      long[] expectedValue = {2, 4, 6, 8, 10};
+
+      try (ResultSet resultSet = statement.executeQuery("select s1_max from root.sg.d2")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          assertEquals(expectedTime[cnt], resultSet.getLong(TIMESTAMP_STR));
+          assertEquals(expectedValue[cnt], resultSet.getLong("root.sg.d2.s1_max"));
+          cnt++;
+        }
+        assertEquals(expectedTime.length, cnt);
+      }
+
+      statement.execute("DROP CQ cq2");
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testCQExecution3() {
+    String insertTemplate =
+        "INSERT INTO root.sg.d3(time, s1) VALUES (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d)";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      long now = System.currentTimeMillis();
+      long firstExecutionTime = now + 10_000;
+      long startTime = firstExecutionTime - 4_000;
+
+      statement.execute("create timeseries root.sg.d3.s1 WITH DATATYPE=INT64");
+      statement.execute("create timeseries root.sg.d3.s1_max WITH DATATYPE=INT64");
+
+      statement.execute(
+          String.format(
+              insertTemplate,
+              startTime,
+              1,
+              startTime + 500,
+              2,
+              startTime + 1_000,
+              3,
+              startTime + 1_500,
+              4,
+              startTime + 2_000,
+              5,
+              startTime + 2_500,
+              6,
+              startTime + 3_000,
+              7,
+              startTime + 3_500,
+              8,
+              startTime + 4_000,
+              9,
+              startTime + 4_500,
+              10));
+
+      statement.execute(
+          "CREATE CONTINUOUS QUERY cq3\n"
+              + "RESAMPLE EVERY 20s RANGE 40s\n"
+              + "BBOUNDARY "
+              + firstExecutionTime
+              + "BEGIN \n"
+              + "  SELECT max_value(s1) \n"
+              + "  INTO root.sg.d3(s1_max)\n"
+              + "  FROM root.sg.d3\n"
+              + "  GROUP BY time(1s) \n"
+              + "  FILL(100)\n"
+              + "END");
+
+      long targetTime = firstExecutionTime + 5_000;
+
+      while (System.currentTimeMillis() - targetTime < 0) {
+        TimeUnit.SECONDS.sleep(1);
+      }
+
+      long[] expectedTime = {
+        startTime - 1_000,
+        startTime,
+        startTime + 1_000,
+        startTime + 2_000,
+        startTime + 3_000,
+        startTime + 4_000
+      };
+      long[] expectedValue = {100, 2, 4, 6, 8, 10};
+
+      try (ResultSet resultSet = statement.executeQuery("select s1_max from root.sg.d3")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          assertEquals(expectedTime[cnt], resultSet.getLong(TIMESTAMP_STR));
+          assertEquals(expectedValue[cnt], resultSet.getLong("root.sg.d3.s1_max"));
+          cnt++;
+        }
+        assertEquals(expectedTime.length, cnt);
+      }
+
+      statement.execute("DROP CQ cq3");
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testCQExecution4() {
+    String insertTemplate =
+        "INSERT INTO root.sg.d4(time, s1) VALUES (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d)";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      long now = System.currentTimeMillis();
+      long firstExecutionTime = now + 10_000;
+      long startTime = firstExecutionTime - 4_000;
+
+      statement.execute("create timeseries root.sg.d4.s1 WITH DATATYPE=INT64");
+      statement.execute("create timeseries root.sg.d4.s1_max WITH DATATYPE=INT64");
+
+      statement.execute(
+          String.format(
+              insertTemplate,
+              startTime,
+              1,
+              startTime + 500,
+              2,
+              startTime + 1_000,
+              3,
+              startTime + 1_500,
+              4,
+              startTime + 2_000,
+              5,
+              startTime + 2_500,
+              6,
+              startTime + 3_000,
+              7,
+              startTime + 3_500,
+              8,
+              startTime + 4_000,
+              9,
+              startTime + 4_500,
+              10));
+
+      statement.execute(
+          "CREATE CONTINUOUS QUERY cq4\n"
+              + "RESAMPLE EVERY 20s RANGE 20s, 10s\n"
+              + "BBOUNDARY "
+              + firstExecutionTime
+              + "BEGIN \n"
+              + "  SELECT max_value(s1) \n"
+              + "  INTO root.sg.d4(s1_max)\n"
+              + "  FROM root.sg.d4\n"
+              + "  GROUP BY time(1s) \n"
+              + "END");
+
+      long targetTime = firstExecutionTime + 5_000;
+
+      while (System.currentTimeMillis() - targetTime < 0) {
+        TimeUnit.SECONDS.sleep(1);
+      }
+
+      long[] expectedTime = {startTime + 2_000, startTime + 4_000};
+      long[] expectedValue = {6, 10};
+
+      try (ResultSet resultSet = statement.executeQuery("select s1_max from root.sg.d4")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          assertEquals(expectedTime[cnt], resultSet.getLong(TIMESTAMP_STR));
+          assertEquals(expectedValue[cnt], resultSet.getLong("root.sg.d4.s1_max"));
+          cnt++;
+        }
+        assertEquals(expectedTime.length, cnt);
+      }
+
+      statement.execute("DROP CQ cq4");
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testCQExecution5() {
+    String insertTemplate =
+        "INSERT INTO root.sg.d5(time, s1) VALUES (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d) (%d, %d)";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      long now = System.currentTimeMillis();
+      long firstExecutionTime = now + 10_000;
+      long startTime = firstExecutionTime - 4_000;
+
+      statement.execute("create timeseries root.sg.d5.s1 WITH DATATYPE=INT64");
+      statement.execute("create timeseries root.sg.d5.precalculated_s1 WITH DATATYPE=INT64");
+
+      statement.execute(
+          String.format(
+              insertTemplate,
+              startTime,
+              1,
+              startTime + 500,
+              2,
+              startTime + 1_000,
+              3,
+              startTime + 1_500,
+              4,
+              startTime + 2_000,
+              5,
+              startTime + 2_500,
+              6,
+              startTime + 3_000,
+              7,
+              startTime + 3_500,
+              8,
+              startTime + 4_000,
+              9,
+              startTime + 4_500,
+              10));
+
+      statement.execute(
+          "CREATE CONTINUOUS QUERY cq5\n"
+              + "RESAMPLE EVERY 2s\n"
+              + "BBOUNDARY "
+              + firstExecutionTime
+              + "BEGIN \n"
+              + "  SELECT s1 + 1 \n"
+              + "  INTO root.sg.d5(precalculated_s1)\n"
+              + "  FROM root.sg.d5\n"
+              + "  align by device\n"
+              + "END");
+
+      long targetTime = firstExecutionTime + 5_000;
+
+      while (System.currentTimeMillis() - targetTime < 0) {
+        TimeUnit.SECONDS.sleep(1);
+      }
+
+      long[] expectedTime = {
+        startTime,
+        startTime + 500,
+        startTime + 1_000,
+        startTime + 1_500,
+        startTime + 2_000,
+        startTime + 2_500,
+        startTime + 3_000,
+        startTime + 3_500,
+        startTime + 4_000,
+        startTime + 4_500
+      };
+      long[] expectedValue = {2, 3, 4, 5, 6, 7, 8, 9, 10, 11};
+
+      try (ResultSet resultSet =
+          statement.executeQuery("select precalculated_s1 from root.sg.d5")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          assertEquals(expectedTime[cnt], resultSet.getLong(TIMESTAMP_STR));
+          assertEquals(expectedValue[cnt], resultSet.getLong("root.sg.d5.precalculated_s1"));
+          cnt++;
+        }
+        assertEquals(expectedTime.length, cnt);
+      }
+
+      statement.execute("DROP CQ cq5");
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java
new file mode 100644
index 0000000000..49a4853190
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/cq/IoTDBCQIT.java
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.it.cq;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category(ClusterIT.class)
+@Ignore
+public class IoTDBCQIT {
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv().initBeforeClass();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanAfterClass();
+  }
+
+  // =======================================create cq======================================
+  @Test
+  public void testCreateWrongCQ() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      // 1. specify first parameter of group by time
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq \n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY([0, 10), 30m)\n"
+                + "END";
+        statement.execute(sql);
+        fail();
+      } catch (Exception e) {
+        // TODO add assert for error message
+      }
+
+      // 2. specify time filter in where clause
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq \n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    WHERE time >= 0 and time <= 10\n"
+                + "    GROUP BY(30m)\n"
+                + "END";
+        statement.execute(sql);
+        fail();
+      } catch (Exception e) {
+        // TODO add assert for error message
+      }
+
+      // 3. no every clause meanwhile no group by time
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq \n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "END";
+        statement.execute(sql);
+        fail();
+      } catch (Exception e) {
+        // TODO add assert for error message
+      }
+
+      // 4. no INTO clause
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq_2\n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(30m)\n"
+                + "END";
+        statement.execute(sql);
+        fail();
+      } catch (Exception e) {
+        // TODO add assert for error message
+      }
+
+      // 5. EVERY interval is less than continuous_query_min_every_interval_in_ms in
+      // iotdb-confignode.properties
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq_2\n"
+                + "RESAMPLE EVERY 50ms\n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(30m)\n"
+                + "END";
+        statement.execute(sql);
+        fail();
+      } catch (Exception e) {
+        // TODO add assert for error message
+      }
+
+      // 6. start_time_offset < 0
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq_2\n"
+                + "RESAMPLE RANGE -1m\n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(30m)\n"
+                + "END";
+        statement.execute(sql);
+        fail();
+      } catch (Exception e) {
+        // TODO add assert for error message
+      }
+
+      // 7. start_time_offset == 0
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq_2\n"
+                + "RESAMPLE RANGE 0m\n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(30m)\n"
+                + "END";
+        statement.execute(sql);
+        fail();
+      } catch (Exception e) {
+        // TODO add assert for error message
+      }
+
+      // 8. end_time_offset < 0
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq_2\n"
+                + "RESAMPLE RANGE 30m, -1m\n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(30m)\n"
+                + "END";
+        statement.execute(sql);
+        fail();
+      } catch (Exception e) {
+        // TODO add assert for error message
+      }
+
+      // 9. end_time_offset == start_time_offset
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq_2\n"
+                + "RESAMPLE RANGE 30m, 30m\n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(30m)\n"
+                + "END";
+        statement.execute(sql);
+        fail();
+      } catch (Exception e) {
+        // TODO add assert for error message
+      }
+
+      // 10. end_time_offset > start_time_offset
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq_2\n"
+                + "RESAMPLE RANGE 30m, 31m\n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(30m)\n"
+                + "END";
+        statement.execute(sql);
+        fail();
+      } catch (Exception e) {
+        // TODO add assert for error message
+      }
+
+      // 11. group_by_interval > start_time_offset
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq_2\n"
+                + "RESAMPLE RANGE 30m\n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(1h)\n"
+                + "END";
+        statement.execute(sql);
+        fail();
+      } catch (Exception e) {
+        // TODO add assert for error message
+      }
+
+      // 12. TIMEOUT POLICY is not BLOCKED or DISCARD
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq_2\n"
+                + "RESAMPLE RANGE 30m\n"
+                + "TIMEOUT POLICY UNKNOWN\n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(1h)\n"
+                + "END";
+        statement.execute(sql);
+        fail();
+      } catch (Exception e) {
+        // TODO add assert for error message
+      }
+
+      // 13. create duplicated cq
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq \n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(30m)\n"
+                + "END";
+        statement.execute(sql);
+        statement.execute(sql);
+        fail();
+      } catch (Exception e) {
+        // TODO add assert for error message
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testCreateCorrectCQ() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try {
+        String sql =
+            "CREATE CQ correct_cq_1 \n"
+                + "RESAMPLE \n"
+                + "  EVERY 30m\n"
+                + "  BOUNDARY 0\n"
+                + "  RANGE 30m, 10m\n"
+                + "TIMEOUT POLICY BLOCKED\n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(30m)\n"
+                + "END";
+        statement.execute(sql);
+      } catch (Exception e) {
+        e.printStackTrace();
+        fail(e.getMessage());
+      }
+
+      try {
+        String sql =
+            "CREATE CQ correct_cq_2\n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(30m)\n"
+                + "END";
+        statement.execute(sql);
+      } catch (Exception e) {
+        e.printStackTrace();
+        fail(e.getMessage());
+      }
+
+      try {
+        String sql =
+            "CREATE CQ correct_cq_3\n"
+                + "RESAMPLE RANGE 30m, 0m\n"
+                + "TIMEOUT POLICY DISCARD\n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(10m)\n"
+                + "END";
+        statement.execute(sql);
+      } catch (Exception e) {
+        e.printStackTrace();
+        fail(e.getMessage());
+      }
+
+      try {
+        String sql =
+            "CREATE CQ s1_count_cq_correct\n"
+                + "RESAMPLE EVERY 30m \n"
+                + "TIMEOUT POLICY DISCARD\n"
+                + "BEGIN \n"
+                + "  SELECT count(s1)  \n"
+                + "    INTO root.sg_count.d(count_s1)\n"
+                + "    FROM root.sg.d\n"
+                + "    GROUP BY(10m)\n"
+                + "END";
+        statement.execute(sql);
+      } catch (Exception e) {
+        e.printStackTrace();
+        fail(e.getMessage());
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  // =======================================show cq======================================
+  @Test
+  public void testShowCQ() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      String[] cqIds = {"show_cq_1", "show_cq_2", "show_cq_3", "show_cq_4"};
+      String[] cqSQLs = {
+        "CREATE CQ show_cq_1 \n"
+            + "RESAMPLE \n"
+            + "  EVERY 30m\n"
+            + "  BOUNDARY 0\n"
+            + "  RANGE 30m, 10m\n"
+            + "TIMEOUT POLICY BLOCKED\n"
+            + "BEGIN \n"
+            + "  SELECT count(s1)  \n"
+            + "    INTO root.sg_count.d(count_s1)\n"
+            + "    FROM root.sg.d\n"
+            + "    GROUP BY(30m)\n"
+            + "END",
+        "CREATE CQ show_cq_2\n"
+            + "BEGIN \n"
+            + "  SELECT count(s1)  \n"
+            + "    INTO root.sg_count.d(count_s1)\n"
+            + "    FROM root.sg.d\n"
+            + "    GROUP BY(30m)\n"
+            + "END",
+        "CREATE CQ show_cq_3\n"
+            + "RESAMPLE RANGE 30m, 0m\n"
+            + "TIMEOUT POLICY DISCARD\n"
+            + "BEGIN \n"
+            + "  SELECT count(s1)  \n"
+            + "    INTO root.sg_count.d(count_s1)\n"
+            + "    FROM root.sg.d\n"
+            + "    GROUP BY(10m)\n"
+            + "END",
+        "CREATE CQ show_cq_4\n"
+            + "RESAMPLE EVERY 30m \n"
+            + "TIMEOUT POLICY DISCARD\n"
+            + "BEGIN \n"
+            + "  SELECT count(s1)  \n"
+            + "    INTO root.sg_count.d(count_s1)\n"
+            + "    FROM root.sg.d\n"
+            + "    GROUP BY(10m)\n"
+            + "END"
+      };
+
+      for (String sql : cqSQLs) {
+        statement.execute(sql);
+      }
+
+      try (ResultSet resultSet = statement.executeQuery("show CQS")) {
+
+        int cnt = 0;
+        while (resultSet.next()) {
+          // No need to add time column for aggregation query
+          assertEquals(cqIds[cnt], resultSet.getString(0));
+          assertEquals(cqSQLs[cnt], resultSet.getString(1));
+          assertEquals("ACTIVE", resultSet.getString(2));
+          cnt++;
+        }
+        assertEquals(cqIds.length, cnt);
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  // =======================================drop cq======================================
+  @Test
+  public void testDropCQ() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      String[] cqIds = {"drop_cq_1", "drop_cq_2", "drop_cq_3", "drop_cq_4"};
+      String[] cqSQLs = {
+        "CREATE CQ drop_cq_1 \n"
+            + "RESAMPLE \n"
+            + "  EVERY 30m\n"
+            + "  BOUNDARY 0\n"
+            + "  RANGE 30m, 10m\n"
+            + "TIMEOUT POLICY BLOCKED\n"
+            + "BEGIN \n"
+            + "  SELECT count(s1)  \n"
+            + "    INTO root.sg_count.d(count_s1)\n"
+            + "    FROM root.sg.d\n"
+            + "    GROUP BY(30m)\n"
+            + "END",
+        "CREATE CQ drop_cq_2\n"
+            + "BEGIN \n"
+            + "  SELECT count(s1)  \n"
+            + "    INTO root.sg_count.d(count_s1)\n"
+            + "    FROM root.sg.d\n"
+            + "    GROUP BY(30m)\n"
+            + "END",
+        "CREATE CQ drop_cq_3\n"
+            + "RESAMPLE RANGE 30m, 0m\n"
+            + "TIMEOUT POLICY DISCARD\n"
+            + "BEGIN \n"
+            + "  SELECT count(s1)  \n"
+            + "    INTO root.sg_count.d(count_s1)\n"
+            + "    FROM root.sg.d\n"
+            + "    GROUP BY(10m)\n"
+            + "END",
+        "CREATE CQ drop_cq_4\n"
+            + "RESAMPLE EVERY 30m \n"
+            + "TIMEOUT POLICY DISCARD\n"
+            + "BEGIN \n"
+            + "  SELECT count(s1)  \n"
+            + "    INTO root.sg_count.d(count_s1)\n"
+            + "    FROM root.sg.d\n"
+            + "    GROUP BY(10m)\n"
+            + "END"
+      };
+
+      for (String sql : cqSQLs) {
+        statement.execute(sql);
+      }
+
+      try (ResultSet resultSet = statement.executeQuery("show CQS")) {
+
+        int cnt = 0;
+        while (resultSet.next()) {
+          // No need to add time column for aggregation query
+          assertEquals(cqIds[cnt], resultSet.getString(0));
+          assertEquals(cqSQLs[cnt], resultSet.getString(1));
+          assertEquals("ACTIVE", resultSet.getString(2));
+          cnt++;
+        }
+        assertEquals(cqIds.length, cnt);
+      }
+
+      statement.execute("DROP CQ drop_cq_2");
+      statement.execute("DROP CQ drop_cq_3");
+
+      int[] resultIndex = {0, 3};
+
+      try (ResultSet resultSet = statement.executeQuery("show CQS")) {
+
+        int cnt = 0;
+        while (resultSet.next()) {
+          // No need to add time column for aggregation query
+          assertEquals(cqIds[resultIndex[cnt]], resultSet.getString(0));
+          assertEquals(cqSQLs[resultIndex[cnt]], resultSet.getString(1));
+          assertEquals("ACTIVE", resultSet.getString(2));
+          cnt++;
+        }
+        assertEquals(resultIndex.length, cnt);
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+}