You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/03/16 02:33:24 UTC

[iotdb] branch xingtanzjr/mpp-query-basis updated (74517ca -> ffbcbbd)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch xingtanzjr/mpp-query-basis
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


    from 74517ca  Milestone: complete the basic frame of query executing state in MPP
     new e7d8a19  add some interface
     new ffbcbbd  mpp interface definition

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 pom.xml                                            |  5 ++
 server/pom.xml                                     |  4 +
 .../buffer/SinkHandler.java}                       | 33 ++++----
 .../buffer/SourceHandler.java}                     | 24 +++---
 .../CsvSinkNode.java => mpp/common/Analysis.java}  | 24 +-----
 .../common/FillPolicy.java}                        | 23 +-----
 .../common/GroupByTimeParameter.java}              | 27 ++----
 .../CsvSinkNode.java => mpp/common/OrderBy.java}   | 27 ++----
 .../common/QueryContext.java}                      | 29 +++----
 .../CsvSinkNode.java => mpp/common/QueryId.java}   | 23 ++----
 .../common/QuerySession.java}                      | 23 +-----
 .../common/TreeNode.java}                          | 27 +++---
 .../org/apache/iotdb/db/mpp/common/TsBlock.java    | 48 +++++++++++
 .../iotdb/db/mpp/common/TsBlockMetadata.java       | 41 +++++++++
 .../common/WithoutPolicy.java}                     | 24 ++----
 .../execution/Coordinator.java}                    | 29 ++++---
 .../db/mpp/execution/ExecFragmentInstance.java     | 57 +++++++++++++
 .../iotdb/db/mpp/execution/QueryExecution.java     | 96 ++++++++++++++++++++++
 .../iotdb/db/mpp/execution/QueryScheduler.java     | 51 ++++++++++++
 .../execution/QueryStateMachine.java}              | 25 ++----
 .../org/apache/iotdb/db/mpp/operator/Operator.java | 91 ++++++++++++++++++++
 .../operator/OperatorContext.java}                 | 29 ++++---
 .../operator/SeriesScanOperator.java}              | 21 +----
 .../plan/DistributedQueryPlan.java}                | 28 +++----
 .../plan/DistributionPlanner.java}                 | 27 +++---
 .../plan/FragmentInstance.java}                    | 25 ++----
 .../plan/FragmentInstanceId.java}                  | 24 ++----
 .../plan/LogicalPlanner.java}                      | 29 ++++---
 .../plan/LogicalQueryPlan.java}                    | 28 +++----
 .../plan/PlanFragment.java}                        | 29 ++-----
 .../plan/PlanFragmentId.java}                      | 23 ++----
 .../plan/node/PlanNode.java}                       | 26 +++---
 .../db/{query => }/mpp/plan/node/PlanNodeId.java   | 26 +++---
 .../plan/node/PlanNodeIdAllocator.java}            | 24 +-----
 .../db/mpp/plan/node/process/DeviceMergeNode.java  | 66 +++++++++++++++
 .../plan/node/process/FillNode.java}               | 26 +++---
 .../plan/node/process/FilterNode.java}             | 26 +++---
 .../db/mpp/plan/node/process/GroupByLevelNode.java | 41 +++++++++
 .../plan/node/process/LimitNode.java}              | 25 +++---
 .../plan/node/process/OffsetNode.java}             | 26 +++---
 .../plan/node/process/ProcessNode.java}            | 23 ++----
 .../node/process/RowBasedSeriesAggregateNode.java  | 59 +++++++++++++
 .../plan/node/process/SortNode.java}               | 26 +++---
 .../db/mpp/plan/node/process/TimeJoinNode.java     | 67 +++++++++++++++
 .../plan/node/process/WithoutNode.java}            | 26 +++---
 .../mpp/plan/node/sink/CsvSinkNode.java            | 13 +--
 .../plan/node/sink/FragmentSinkNode.java}          | 21 ++---
 .../{query => }/mpp/plan/node/sink/SinkNode.java   | 17 ++--
 .../mpp/plan/node/sink/ThriftSinkNode.java         | 13 +--
 .../mpp/plan/node/source/CsvSourceNode.java        |  9 +-
 .../mpp/plan/node/source/SeriesAggregateNode.java  | 81 ++++++++++++++++++
 .../db/mpp/plan/node/source/SeriesScanNode.java    | 85 +++++++++++++++++++
 .../mpp/plan/node/source/SourceNode.java           | 11 ++-
 .../plan/optimzation/PlanOptimizer.java}           | 25 ++----
 .../apache/iotdb/db/query/mpp/common/Analysis.java |  8 --
 .../iotdb/db/query/mpp/common/FillPolicy.java      |  5 --
 .../db/query/mpp/common/GroupByTimeParameter.java  | 10 ---
 .../apache/iotdb/db/query/mpp/common/OrderBy.java  | 11 ---
 .../iotdb/db/query/mpp/common/QueryContext.java    | 10 ---
 .../apache/iotdb/db/query/mpp/common/QueryId.java  | 13 ---
 .../iotdb/db/query/mpp/common/QuerySession.java    |  4 -
 .../apache/iotdb/db/query/mpp/common/TreeNode.java | 23 ------
 .../apache/iotdb/db/query/mpp/common/TsBlock.java  | 30 -------
 .../iotdb/db/query/mpp/common/TsBlockMetadata.java | 19 -----
 .../iotdb/db/query/mpp/common/WithoutPolicy.java   |  6 --
 .../iotdb/db/query/mpp/exec/Coordinator.java       | 28 -------
 .../iotdb/db/query/mpp/exec/QueryExecution.java    | 86 -------------------
 .../iotdb/db/query/mpp/exec/QueryScheduler.java    | 38 ---------
 .../iotdb/db/query/mpp/exec/QueryStateMachine.java |  8 --
 .../db/query/mpp/plan/DistributedQueryPlan.java    | 16 ----
 .../db/query/mpp/plan/DistributionPlanner.java     | 17 ----
 .../iotdb/db/query/mpp/plan/FragmentInstance.java  | 11 ---
 .../db/query/mpp/plan/FragmentInstanceId.java      | 10 ---
 .../iotdb/db/query/mpp/plan/LogicalPlanner.java    | 23 ------
 .../iotdb/db/query/mpp/plan/LogicalQueryPlan.java  | 13 ---
 .../iotdb/db/query/mpp/plan/PlanFragment.java      | 13 ---
 .../iotdb/db/query/mpp/plan/PlanFragmentId.java    |  8 --
 .../iotdb/db/query/mpp/plan/node/PlanNode.java     | 17 ----
 .../query/mpp/plan/node/PlanNodeIdAllocator.java   |  7 --
 .../mpp/plan/node/process/DeviceMergeNode.java     | 47 -----------
 .../db/query/mpp/plan/node/process/FillNode.java   | 23 ------
 .../db/query/mpp/plan/node/process/FilterNode.java | 22 -----
 .../mpp/plan/node/process/GroupByLevelNode.java    | 25 ------
 .../db/query/mpp/plan/node/process/LimitNode.java  | 22 -----
 .../db/query/mpp/plan/node/process/OffsetNode.java | 22 -----
 .../query/mpp/plan/node/process/ProcessNode.java   | 11 ---
 .../node/process/RowBasedSeriesAggregateNode.java  | 37 ---------
 .../db/query/mpp/plan/node/process/SortNode.java   | 22 -----
 .../query/mpp/plan/node/process/TimeJoinNode.java  | 48 -----------
 .../query/mpp/plan/node/process/WithoutNode.java   | 22 -----
 .../query/mpp/plan/node/sink/FragmentSinkNode.java | 19 -----
 .../mpp/plan/node/source/SeriesAggregateNode.java  | 63 --------------
 .../query/mpp/plan/node/source/SeriesScanNode.java | 60 --------------
 .../query/mpp/plan/optimzation/PlanOptimizer.java  |  9 --
 94 files changed, 1150 insertions(+), 1492 deletions(-)
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/ThriftSinkNode.java => mpp/buffer/SinkHandler.java} (51%)
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/CsvSinkNode.java => mpp/buffer/SourceHandler.java} (68%)
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/CsvSinkNode.java => mpp/common/Analysis.java} (67%)
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/CsvSinkNode.java => mpp/common/FillPolicy.java} (67%)
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/CsvSinkNode.java => mpp/common/GroupByTimeParameter.java} (66%)
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/CsvSinkNode.java => mpp/common/OrderBy.java} (67%)
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/CsvSinkNode.java => mpp/common/QueryContext.java} (67%)
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/CsvSinkNode.java => mpp/common/QueryId.java} (68%)
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/CsvSinkNode.java => mpp/common/QuerySession.java} (67%)
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/ThriftSinkNode.java => mpp/common/TreeNode.java} (61%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlockMetadata.java
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/CsvSinkNode.java => mpp/common/WithoutPolicy.java} (67%)
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/ThriftSinkNode.java => mpp/execution/Coordinator.java} (55%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecFragmentInstance.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryScheduler.java
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/CsvSinkNode.java => mpp/execution/QueryStateMachine.java} (71%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/ThriftSinkNode.java => mpp/operator/OperatorContext.java} (58%)
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/CsvSinkNode.java => mpp/operator/SeriesScanOperator.java} (71%)
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/CsvSinkNode.java => mpp/plan/DistributedQueryPlan.java} (59%)
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/ThriftSinkNode.java => mpp/plan/DistributionPlanner.java} (63%)
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/CsvSinkNode.java => mpp/plan/FragmentInstance.java} (65%)
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/CsvSinkNode.java => mpp/plan/FragmentInstanceId.java} (68%)
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/CsvSinkNode.java => mpp/plan/LogicalPlanner.java} (54%)
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/ThriftSinkNode.java => mpp/plan/LogicalQueryPlan.java} (62%)
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/ThriftSinkNode.java => mpp/plan/PlanFragment.java} (64%)
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/CsvSinkNode.java => mpp/plan/PlanFragmentId.java} (68%)
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/ThriftSinkNode.java => mpp/plan/node/PlanNode.java} (58%)
 rename server/src/main/java/org/apache/iotdb/db/{query => }/mpp/plan/node/PlanNodeId.java (75%)
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/CsvSinkNode.java => mpp/plan/node/PlanNodeIdAllocator.java} (67%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/DeviceMergeNode.java
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/CsvSinkNode.java => mpp/plan/node/process/FillNode.java} (59%)
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/ThriftSinkNode.java => mpp/plan/node/process/FilterNode.java} (59%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/GroupByLevelNode.java
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/ThriftSinkNode.java => mpp/plan/node/process/LimitNode.java} (63%)
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/ThriftSinkNode.java => mpp/plan/node/process/OffsetNode.java} (62%)
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/CsvSinkNode.java => mpp/plan/node/process/ProcessNode.java} (69%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/RowBasedSeriesAggregateNode.java
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/ThriftSinkNode.java => mpp/plan/node/process/SortNode.java} (58%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/TimeJoinNode.java
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/CsvSinkNode.java => mpp/plan/node/process/WithoutNode.java} (57%)
 copy server/src/main/java/org/apache/iotdb/db/{query => }/mpp/plan/node/sink/CsvSinkNode.java (83%)
 copy server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/ThriftSinkNode.java => mpp/plan/node/sink/FragmentSinkNode.java} (72%)
 rename server/src/main/java/org/apache/iotdb/db/{query => }/mpp/plan/node/sink/SinkNode.java (73%)
 rename server/src/main/java/org/apache/iotdb/db/{query => }/mpp/plan/node/sink/ThriftSinkNode.java (83%)
 rename server/src/main/java/org/apache/iotdb/db/{query => }/mpp/plan/node/source/CsvSourceNode.java (86%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesAggregateNode.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesScanNode.java
 rename server/src/main/java/org/apache/iotdb/db/{query => }/mpp/plan/node/source/SourceNode.java (79%)
 rename server/src/main/java/org/apache/iotdb/db/{query/mpp/plan/node/sink/CsvSinkNode.java => mpp/plan/optimzation/PlanOptimizer.java} (67%)
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/common/Analysis.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/common/FillPolicy.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/common/GroupByTimeParameter.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/common/OrderBy.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/common/QueryContext.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/common/QueryId.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/common/QuerySession.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/common/TreeNode.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/common/TsBlock.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/common/TsBlockMetadata.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/common/WithoutPolicy.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/exec/Coordinator.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryExecution.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryScheduler.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryStateMachine.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/plan/DistributedQueryPlan.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/plan/DistributionPlanner.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/plan/FragmentInstance.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/plan/FragmentInstanceId.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/plan/LogicalPlanner.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/plan/LogicalQueryPlan.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/plan/PlanFragment.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/plan/PlanFragmentId.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/PlanNode.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/PlanNodeIdAllocator.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/DeviceMergeNode.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/FillNode.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/FilterNode.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/GroupByLevelNode.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/LimitNode.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/OffsetNode.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/ProcessNode.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/RowBasedSeriesAggregateNode.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/SortNode.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/TimeJoinNode.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/WithoutNode.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/FragmentSinkNode.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SeriesAggregateNode.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SeriesScanNode.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/mpp/plan/optimzation/PlanOptimizer.java

[iotdb] 02/02: mpp interface definition

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch xingtanzjr/mpp-query-basis
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ffbcbbdd3be942c576da05212204a07308fd84a8
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Wed Mar 16 10:32:24 2022 +0800

    mpp interface definition
---
 .../buffer/SinkHandler.java}                       |  23 +++--
 .../buffer/SourceHandler.java}                     |  14 +--
 .../common/Analysis.java}                          |   8 +-
 .../common/FillPolicy.java}                        |   7 +-
 .../common/GroupByTimeParameter.java}              |  23 ++---
 .../common/OrderBy.java}                           |  11 ++-
 .../common/QueryContext.java}                      |  13 ++-
 .../common/QueryId.java}                           |  15 ++-
 .../common/QuerySession.java}                      |   7 +-
 .../common/TreeNode.java}                          |  29 +++---
 .../common/TsBlock.java}                           |  37 +++++---
 .../iotdb/db/mpp/common/TsBlockMetadata.java       |  41 ++++++++
 .../common/WithoutPolicy.java}                     |   8 +-
 .../mpp/exec => mpp/execution}/Coordinator.java    |  31 +++---
 .../db/mpp/execution/ExecFragmentInstance.java     |  57 +++++++++++
 .../iotdb/db/mpp/execution/QueryExecution.java     |  96 +++++++++++++++++++
 .../iotdb/db/mpp/execution/QueryScheduler.java     |  51 ++++++++++
 .../exec => mpp/execution}/QueryStateMachine.java  |   9 +-
 .../org/apache/iotdb/db/mpp/operator/Operator.java |  91 ++++++++++++++++++
 .../{query => }/mpp/operator/OperatorContext.java  |  24 ++---
 .../mpp/operator/SeriesScanOperator.java           |   5 +-
 .../plan/DistributedQueryPlan.java}                |  24 +++--
 .../plan/DistributionPlanner.java}                 |  25 +++--
 .../plan/FragmentInstance.java}                    |  21 ++---
 .../plan/FragmentInstanceId.java}                  |  18 ++--
 .../plan/LogicalPlanner.java}                      |  29 +++---
 .../plan/LogicalQueryPlan.java}                    |  26 +++---
 .../plan/PlanFragment.java}                        |  23 ++---
 .../plan/PlanFragmentId.java}                      |  11 ++-
 .../plan/node/PlanNode.java}                       |  26 +++---
 .../db/{query => }/mpp/plan/node/PlanNodeId.java   |  26 +++---
 .../plan/node/PlanNodeIdAllocator.java}            |   8 +-
 .../db/mpp/plan/node/process/DeviceMergeNode.java  |  66 +++++++++++++
 .../plan/node/process/FillNode.java}               |  26 +++---
 .../plan/node/process/FilterNode.java}             |  26 +++---
 .../db/mpp/plan/node/process/GroupByLevelNode.java |  41 ++++++++
 .../plan/node/process/LimitNode.java}              |  25 ++---
 .../plan/node/process/OffsetNode.java}             |  28 +++---
 .../plan/node/process/ProcessNode.java}            |  23 ++---
 .../node/process/RowBasedSeriesAggregateNode.java  |  59 ++++++++++++
 .../plan/node/process/SortNode.java}               |  29 +++---
 .../db/mpp/plan/node/process/TimeJoinNode.java     |  67 +++++++++++++
 .../plan/node/process/WithoutNode.java}            |  26 +++---
 .../mpp/plan/node/sink/CsvSinkNode.java            |  13 +--
 .../plan/node/sink/FragmentSinkNode.java}          |  21 ++---
 .../{query => }/mpp/plan/node/sink/SinkNode.java   |  17 ++--
 .../mpp/plan/node/sink/ThriftSinkNode.java         |  13 +--
 .../mpp/plan/node/source/CsvSourceNode.java        |   8 +-
 .../mpp/plan/node/source/SeriesAggregateNode.java  |  81 ++++++++++++++++
 .../mpp/plan/node/source/SeriesScanNode.java       |  14 ++-
 .../mpp/plan/node/source/SourceNode.java           |  11 +--
 .../plan/optimzation/PlanOptimizer.java}           |  21 ++---
 .../apache/iotdb/db/query/mpp/common/Analysis.java |   8 --
 .../iotdb/db/query/mpp/common/FillPolicy.java      |   5 -
 .../db/query/mpp/common/GroupByTimeParameter.java  |  10 --
 .../apache/iotdb/db/query/mpp/common/OrderBy.java  |  11 ---
 .../iotdb/db/query/mpp/common/QueryContext.java    |  10 --
 .../apache/iotdb/db/query/mpp/common/QueryId.java  |  13 ---
 .../iotdb/db/query/mpp/common/QuerySession.java    |   4 -
 .../apache/iotdb/db/query/mpp/common/TreeNode.java |  23 -----
 .../apache/iotdb/db/query/mpp/common/TsBlock.java  |  30 ------
 .../iotdb/db/query/mpp/common/TsBlockMetadata.java |  19 ----
 .../iotdb/db/query/mpp/common/WithoutPolicy.java   |   6 --
 .../db/query/mpp/exec/ExecFragmentInstance.java    |  59 ------------
 .../iotdb/db/query/mpp/exec/QueryExecution.java    | 104 ---------------------
 .../iotdb/db/query/mpp/exec/QueryScheduler.java    |  56 -----------
 .../iotdb/db/query/mpp/operator/Operator.java      | 102 --------------------
 .../db/query/mpp/plan/DistributedQueryPlan.java    |  16 ----
 .../db/query/mpp/plan/DistributionPlanner.java     |  17 ----
 .../iotdb/db/query/mpp/plan/FragmentInstance.java  |  11 ---
 .../db/query/mpp/plan/FragmentInstanceId.java      |  10 --
 .../iotdb/db/query/mpp/plan/LogicalPlanner.java    |  23 -----
 .../iotdb/db/query/mpp/plan/LogicalQueryPlan.java  |  13 ---
 .../iotdb/db/query/mpp/plan/PlanFragment.java      |  13 ---
 .../iotdb/db/query/mpp/plan/PlanFragmentId.java    |   8 --
 .../iotdb/db/query/mpp/plan/node/PlanNode.java     |  17 ----
 .../query/mpp/plan/node/PlanNodeIdAllocator.java   |   7 --
 .../mpp/plan/node/process/DeviceMergeNode.java     |  47 ----------
 .../db/query/mpp/plan/node/process/FillNode.java   |  23 -----
 .../db/query/mpp/plan/node/process/FilterNode.java |  22 -----
 .../mpp/plan/node/process/GroupByLevelNode.java    |  25 -----
 .../db/query/mpp/plan/node/process/LimitNode.java  |  22 -----
 .../db/query/mpp/plan/node/process/OffsetNode.java |  22 -----
 .../query/mpp/plan/node/process/ProcessNode.java   |  11 ---
 .../node/process/RowBasedSeriesAggregateNode.java  |  37 --------
 .../db/query/mpp/plan/node/process/SortNode.java   |  22 -----
 .../query/mpp/plan/node/process/TimeJoinNode.java  |  48 ----------
 .../query/mpp/plan/node/process/WithoutNode.java   |  22 -----
 .../query/mpp/plan/node/sink/FragmentSinkNode.java |  19 ----
 .../mpp/plan/node/source/SeriesAggregateNode.java  |  82 ----------------
 .../query/mpp/plan/optimzation/PlanOptimizer.java  |   9 --
 91 files changed, 1046 insertions(+), 1412 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandler.java
similarity index 60%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandler.java
index 91ec40f..1b2c37d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandler.java
@@ -16,21 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.buffer;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
-import java.io.Closeable;
 import java.nio.ByteBuffer;
 
-public interface ExchangeDataSource extends Closeable {
+public interface SinkHandler {
 
-    ByteBuffer pollTsBlock();
+  /** Get a future that will be completed when the buffer is not full. */
+  ListenableFuture<Void> isFull();
 
-    boolean isFinished();
+  /**
+   * Sends a tsBlock to an unpartitioned buffer. If no-more-pages has been set, the send tsBlock
+   * call is ignored. This can happen with limit queries.
+   */
+  void send(ByteBuffer tsBlock);
 
-    ListenableFuture<Void> isBlocked();
-
-    @Override
-    void close();
+  /**
+   * Sends a tsBlock to a specific partition. If no-more-pages has been set, the send tsBlock call
+   * is ignored. This can happen with limit queries.
+   */
+  void send(int partition, ByteBuffer tsBlock);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandler.java
similarity index 80%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandler.java
index 91ec40f..99f5d84 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandler.java
@@ -16,21 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.buffer;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
 import java.io.Closeable;
 import java.nio.ByteBuffer;
 
-public interface ExchangeDataSource extends Closeable {
+public interface SourceHandler extends Closeable {
 
-    ByteBuffer pollTsBlock();
+  ByteBuffer receive();
 
-    boolean isFinished();
+  boolean isFinished();
 
-    ListenableFuture<Void> isBlocked();
+  ListenableFuture<Void> isBlocked();
 
-    @Override
-    void close();
+  @Override
+  void close();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/Analysis.java
similarity index 77%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/common/Analysis.java
index eff9fa7..178402a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/Analysis.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.common;
 
-public class SeriesScanOperator {
-}
+/** Analysis used for planning a query. TODO: This class may need to store more info for a query. */
+public class Analysis {}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FillPolicy.java
similarity index 85%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/common/FillPolicy.java
index eff9fa7..1b753d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FillPolicy.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,7 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.common;
 
-public class SeriesScanOperator {
+public enum FillPolicy {
+  PREVIOUS,
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/GroupByTimeParameter.java
similarity index 69%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/common/GroupByTimeParameter.java
index 91ec40f..3368274 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/GroupByTimeParameter.java
@@ -16,21 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.common;
 
-import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
 
-import java.io.Closeable;
-import java.nio.ByteBuffer;
-
-public interface ExchangeDataSource extends Closeable {
-
-    ByteBuffer pollTsBlock();
-
-    boolean isFinished();
-
-    ListenableFuture<Void> isBlocked();
-
-    @Override
-    void close();
-}
+/**
+ * In single-node IoTDB, the GroupByTimePlan is used to represent the parameter of `group by time`.
+ * To avoid ambiguity, we use another name `GroupByTimeParameter` here
+ */
+public class GroupByTimeParameter extends GroupByTimePlan {}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/OrderBy.java
similarity index 75%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/common/OrderBy.java
index eff9fa7..a8d9b94 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/OrderBy.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,7 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.common;
 
-public class SeriesScanOperator {
+/** The traversal order for operators by timestamp */
+public enum OrderBy {
+  TIMESTAMP_ASC,
+  TIMESTAMP_DESC,
+  DEVICE_NAME_ASC,
+  DEVICE_NAME_DESC,
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryContext.java
similarity index 70%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryStateMachine.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/common/QueryContext.java
index 10ba210..2f6715d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryContext.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,11 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.exec;
+package org.apache.iotdb.db.mpp.common;
 
 /**
- * State machine for a QueryExecution. It stores the states for the QueryExecution.
- * Others can register listeners when the state changes of the QueryExecution.
+ * This class is used to record the context of a query including QueryId, query statement, session
+ * info and so on
  */
-public class QueryStateMachine {
+public class QueryContext {
+  private String statement;
+  private QueryId queryId;
+  private QuerySession session;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
similarity index 76%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
index eff9fa7..dd8d436 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,7 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.common;
 
-public class SeriesScanOperator {
+public class QueryId {
+  private String Id;
+
+  public String getId() {
+    return Id;
+  }
+
+  public void setId(String id) {
+    Id = id;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/QuerySession.java
similarity index 85%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/common/QuerySession.java
index eff9fa7..3f1d165 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/QuerySession.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.common;
 
-public class SeriesScanOperator {
-}
+public class QuerySession {}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/TreeNode.java
similarity index 63%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/common/TreeNode.java
index 91ec40f..4a11358 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/TreeNode.java
@@ -16,21 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.common;
 
-import com.google.common.util.concurrent.ListenableFuture;
+import java.util.List;
 
-import java.io.Closeable;
-import java.nio.ByteBuffer;
-
-public interface ExchangeDataSource extends Closeable {
-
-    ByteBuffer pollTsBlock();
+/**
+ * @author A simple class to describe the tree style structure of query executable operators
+ * @param <T>
+ */
+public class TreeNode<T extends TreeNode<T>> {
+  protected List<T> children;
 
-    boolean isFinished();
+  public T getChild(int i) {
+    return hasChild(i) ? children.get(i) : null;
+  }
 
-    ListenableFuture<Void> isBlocked();
+  public boolean hasChild(int i) {
+    return children.size() > i;
+  }
 
-    @Override
-    void close();
+  public void addChild(T n) {
+    children.add(n);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/OperatorContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
similarity index 50%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/OperatorContext.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
index 84c2964..aa40205 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/OperatorContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
@@ -16,24 +16,33 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.common;
 
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
 
 /**
- * Contains information about {@link Operator} execution.
- * <p>
- * Not thread-safe.
+ * Intermediate result for most of ExecOperators. The Tablet contains data from one or more columns
+ * and constructs them as a row based view The columns can be series, aggregation result for one
+ * series or scalar value (such as deviceName). The Tablet also contains the metadata to describe
+ * the columns.
+ *
+ * <p>TODO: consider the detailed data store model in memory. (using column based or row based ?)
  */
-public class OperatorContext {
+public class TsBlock {
+
+  // Describe the column info
+  private TsBlockMetadata metadata;
+
+  public boolean hasNext() {
+    return false;
+  }
 
-    private final int operatorId;
-    private final PlanNodeId planNodeId;
-    private final String operatorType;
+  // Get next row in current tablet
+  public RowRecord getNext() {
+    return null;
+  }
 
-    public OperatorContext(int operatorId, PlanNodeId planNodeId, String operatorType) {
-        this.operatorId = operatorId;
-        this.planNodeId = planNodeId;
-        this.operatorType = operatorType;
-    }
+  public TsBlockMetadata getMetadata() {
+    return metadata;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlockMetadata.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlockMetadata.java
new file mode 100644
index 0000000..d8e480c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlockMetadata.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.common;
+
+import java.util.List;
+
+public class TsBlockMetadata {
+  // list of all columns in current Tablet
+  // The column list not only contains the series column, but also contains other column to
+  // construct the final result
+  // set such as timestamp and deviceName
+  private List<String> columnList;
+
+  // Indicate whether the result set should be aligned by device. This parameter can be used for
+  // downstream operators
+  // when processing data from current Tablet. The RowRecord produced by Tablet with
+  // `alignedByDevice = true` will contain
+  // n + 1 fields which are n series field and 1 deviceName field.
+  // For example, when the FilterOperator execute the filter operation, it may need the deviceName
+  // field when matching
+  // the series with corresponding column in Tablet
+  //
+  // If alignedByDevice is true, the owned series should belong to one device
+  private boolean alignedByDevice;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java
similarity index 83%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java
index eff9fa7..7a4107e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,7 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.common;
 
-public class SeriesScanOperator {
+public enum WithoutPolicy {
+  CONTAINS_NULL,
+  ALL_NULL
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
similarity index 59%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/exec/Coordinator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index b76bc8b..f6c85e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -16,31 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.exec;
+package org.apache.iotdb.db.mpp.execution;
 
-import org.apache.iotdb.db.query.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.common.QueryId;
 
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * The coordinator for MPP.
- * It manages all the queries which are executed in current Node. And it will be responsible for the lifecycle of a query.
- * A query request will be represented as a QueryExecution.
+ * The coordinator for MPP. It manages all the queries which are executed in current Node. And it
+ * will be responsible for the lifecycle of a query. A query request will be represented as a
+ * QueryExecution.
  */
 public class Coordinator {
 
-    private ConcurrentHashMap<QueryId, QueryExecution> queryExecutionMap;
+  private ConcurrentHashMap<QueryId, QueryExecution> queryExecutionMap;
 
-    private QueryExecution createQueryExecution() {
-        return null;
-    }
+  private QueryExecution createQueryExecution() {
+    return null;
+  }
 
-    private QueryExecution getQueryExecutionById() {
-        return null;
-    }
+  private QueryExecution getQueryExecutionById() {
+    return null;
+  }
 
-//    private TQueryResponse executeQuery(TQueryRequest request) {
-//
-//    }
+  //    private TQueryResponse executeQuery(TQueryRequest request) {
+  //
+  //    }
 }
-
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecFragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecFragmentInstance.java
new file mode 100644
index 0000000..50f46ed
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecFragmentInstance.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import io.airlift.units.Duration;
+
+import java.io.Closeable;
+
+/**
+ * ExecutableFragmentInstance encapsulates some methods which are necessary for execution scheduler
+ * to run a fragment instance
+ */
+public interface ExecFragmentInstance extends Closeable {
+
+  /**
+   * Used to judge whether this fragment instance has any more data to process
+   *
+   * @return true if the FragmentInstance is done, otherwise false.
+   */
+  boolean isFinished();
+
+  /**
+   * run the fragment instance for {@param duration} time slice, the time of this run is likely not
+   * to be equal to {@param duration}, the actual run time should be calculated by the caller
+   *
+   * @param duration how long should this fragment instance run
+   * @return the returned ListenableFuture<Void> is used to represent status of this processing if
+   *     isDone() return true, meaning that this fragment instance is not blocked and is ready for
+   *     next processing otherwise, meaning that this fragment instance is blocked and not ready for
+   *     next processing.
+   */
+  ListenableFuture<Void> processFor(Duration duration);
+
+  /** @return the information about this Fragment Instance in String format */
+  String getInfo();
+
+  /** clear resource used by this fragment instance */
+  @Override
+  void close();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
new file mode 100644
index 0000000..2b9d8cf
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import org.apache.iotdb.db.mpp.common.Analysis;
+import org.apache.iotdb.db.mpp.common.QueryContext;
+import org.apache.iotdb.db.mpp.plan.*;
+import org.apache.iotdb.db.mpp.plan.optimzation.PlanOptimizer;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * QueryExecution stores all the status of a query which is being prepared or running inside the MPP
+ * frame. It takes three main responsibilities: 1. Prepare a query. Transform a query from statement
+ * to DistributedQueryPlan with fragment instances. 2. Dispatch all the fragment instances to
+ * corresponding physical nodes. 3. Collect and monitor the progress/states of this query.
+ */
+public class QueryExecution {
+  private QueryContext context;
+  private QueryScheduler scheduler;
+  private QueryStateMachine stateMachine;
+
+  private List<PlanOptimizer> planOptimizers;
+
+  private Analysis analysis;
+  private LogicalQueryPlan logicalPlan;
+  private DistributedQueryPlan distributedPlan;
+  private List<PlanFragment> fragments;
+  private List<FragmentInstance> fragmentInstances;
+
+  public QueryExecution(QueryContext context) {
+    this.context = context;
+  }
+
+  public void plan() {
+    analyze();
+    doLogicalPlan();
+    doDistributedPlan();
+    planFragmentInstances();
+  }
+
+  public void schedule() {
+    this.scheduler = new QueryScheduler(this.stateMachine, this.fragmentInstances);
+    this.scheduler.start();
+  }
+
+  // Analyze the statement in QueryContext. Generate the analysis this query need
+  public void analyze() {
+    // initialize the variable `analysis`
+
+  }
+
+  // Use LogicalPlanner to do the logical query plan and logical optimization
+  public void doLogicalPlan() {
+    LogicalPlanner planner = new LogicalPlanner(this.analysis, this.context, this.planOptimizers);
+    this.logicalPlan = planner.plan();
+  }
+
+  // Generate the distributed plan and split it into fragments
+  public void doDistributedPlan() {
+    DistributionPlanner planner = new DistributionPlanner(this.analysis, this.logicalPlan);
+    this.distributedPlan = planner.planFragments();
+  }
+
+  // Convert fragment to detailed instance
+  // And for parallel-able fragment, clone it into several instances with different params.
+  public void planFragmentInstances() {}
+
+  /**
+   * This method will be called by the request thread from client connection. This method will block
+   * until one of these conditions occurs: 1. There is a batch of result 2. There is no more result
+   * 3. The query has been cancelled 4. The query is timeout This method will fetch the result from
+   * DataStreamManager use the virtual ResultOperator's ID (This part will be designed and
+   * implemented with DataStreamManager)
+   */
+  public ByteBuffer getBatchResult() {
+    return null;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryScheduler.java
new file mode 100644
index 0000000..fc5df30
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryScheduler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import org.apache.iotdb.db.mpp.plan.FragmentInstance;
+
+import java.util.List;
+
+/**
+ * QueryScheduler is used to dispatch the fragment instances of a query to target nodes. And it will
+ * continue to collect and monitor the query execution before the query is finished.
+ *
+ * <p>Later, we can add more control logic for a QueryExecution such as retry, kill and so on by
+ * this scheduler.
+ */
+public class QueryScheduler {
+  // The stateMachine of the QueryExecution owned by this QueryScheduler
+  private QueryStateMachine stateMachine;
+
+  // The fragment instances which should be sent to corresponding Nodes.
+  private List<FragmentInstance> instances;
+
+  public QueryScheduler(QueryStateMachine stateMachine, List<FragmentInstance> instances) {
+    this.stateMachine = stateMachine;
+    this.instances = instances;
+  }
+
+  public void start() {}
+
+  // Send the instances to other nodes
+  private void sendFragmentInstances() {}
+
+  // After sending, start to collect the states of these fragment instances
+  private void startMonitorInstances() {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
similarity index 83%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryStateMachine.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index 10ba210..d8ca6bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -16,11 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.exec;
+package org.apache.iotdb.db.mpp.execution;
 
 /**
- * State machine for a QueryExecution. It stores the states for the QueryExecution.
- * Others can register listeners when the state changes of the QueryExecution.
+ * State machine for a QueryExecution. It stores the states for the QueryExecution. Others can
+ * register listeners when the state changes of the QueryExecution.
  */
-public class QueryStateMachine {
-}
+public class QueryStateMachine {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
new file mode 100644
index 0000000..8e8d3aa
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.operator;
+
+import org.apache.iotdb.db.mpp.common.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
+
+public interface Operator extends AutoCloseable {
+  ListenableFuture<Void> NOT_BLOCKED = immediateVoidFuture();
+
+  OperatorContext getOperatorContext();
+
+  /**
+   * Returns a future that will be completed when the operator becomes unblocked. If the operator is
+   * not blocked, this method should return {@code NOT_BLOCKED}.
+   */
+  default ListenableFuture<Void> isBlocked() {
+    return NOT_BLOCKED;
+  }
+
+  /** Returns true if and only if this operator can accept an input page. */
+  boolean needsInput();
+
+  /**
+   * Adds an input page to the operator. This method will only be called if {@code needsInput()}
+   * returns true.
+   */
+  void addInput(TsBlock page);
+
+  /**
+   * Gets an output page from the operator. If no output data is currently available, return null.
+   */
+  TsBlock getOutput();
+
+  /**
+   * After calling this method operator should revoke all reserved revocable memory. As soon as
+   * memory is revoked returned future should be marked as done.
+   *
+   * <p>Spawned threads cannot modify OperatorContext because it's not thread safe. For this purpose
+   * implement {@link #finishMemoryRevoke()}
+   *
+   * <p>Since memory revoking signal is delivered asynchronously to the Operator, implementation
+   * must gracefully handle the case when there no longer is any revocable memory allocated.
+   *
+   * <p>After this method is called on Operator the Driver is disallowed to call any processing
+   * methods on it (isBlocked/needsInput/addInput/getOutput) until {@link #finishMemoryRevoke()} is
+   * called.
+   */
+  default ListenableFuture<Void> startMemoryRevoke() {
+    return NOT_BLOCKED;
+  }
+
+  /**
+   * Clean up and release resources after completed memory revoking. Called by driver once future
+   * returned by startMemoryRevoke is completed.
+   */
+  default void finishMemoryRevoke() {}
+
+  /**
+   * Notifies the operator that no more pages will be added and the operator should finish
+   * processing and flush results. This method will not be called if the Task is already failed or
+   * canceled.
+   */
+  void finish();
+
+  /** Is this operator completely finished processing and no more output pages will be produced. */
+  boolean isFinished();
+
+  /** This method will always be called before releasing the Operator reference. */
+  @Override
+  default void close() throws Exception {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/OperatorContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/OperatorContext.java
similarity index 66%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/OperatorContext.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/OperatorContext.java
index 84c2964..c635f74 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/OperatorContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/OperatorContext.java
@@ -16,24 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.operator;
 
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
 
 /**
  * Contains information about {@link Operator} execution.
- * <p>
- * Not thread-safe.
+ *
+ * <p>Not thread-safe.
  */
 public class OperatorContext {
 
-    private final int operatorId;
-    private final PlanNodeId planNodeId;
-    private final String operatorType;
+  private final int operatorId;
+  private final PlanNodeId planNodeId;
+  private final String operatorType;
 
-    public OperatorContext(int operatorId, PlanNodeId planNodeId, String operatorType) {
-        this.operatorId = operatorId;
-        this.planNodeId = planNodeId;
-        this.operatorType = operatorType;
-    }
+  public OperatorContext(int operatorId, PlanNodeId planNodeId, String operatorType) {
+    this.operatorId = operatorId;
+    this.planNodeId = planNodeId;
+    this.operatorType = operatorType;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperator.java
similarity index 90%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperator.java
index eff9fa7..9614c92 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperator.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.operator;
 
-public class SeriesScanOperator {
-}
+public class SeriesScanOperator {}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
similarity index 63%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
index 91ec40f..9954c74 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
@@ -16,21 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.plan;
 
-import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.db.mpp.common.QueryContext;
+import org.apache.iotdb.db.mpp.common.TsBlock;
+import org.apache.iotdb.db.mpp.plan.node.PlanNode;
 
-import java.io.Closeable;
-import java.nio.ByteBuffer;
+import java.util.List;
 
-public interface ExchangeDataSource extends Closeable {
+public class DistributedQueryPlan {
+  private QueryContext context;
+  private PlanNode<TsBlock> rootNode;
+  private PlanFragment rootFragment;
 
-    ByteBuffer pollTsBlock();
-
-    boolean isFinished();
-
-    ListenableFuture<Void> isBlocked();
-
-    @Override
-    void close();
+  // TODO: consider whether this field is necessary when do the implementation
+  private List<PlanFragment> fragments;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributionPlanner.java
similarity index 66%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributionPlanner.java
index 91ec40f..03eb1dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributionPlanner.java
@@ -16,21 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.plan;
 
-import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.db.mpp.common.Analysis;
 
-import java.io.Closeable;
-import java.nio.ByteBuffer;
+public class DistributionPlanner {
+  private Analysis analysis;
+  private LogicalQueryPlan logicalPlan;
 
-public interface ExchangeDataSource extends Closeable {
+  public DistributionPlanner(Analysis analysis, LogicalQueryPlan logicalPlan) {
+    this.analysis = analysis;
+    this.logicalPlan = logicalPlan;
+  }
 
-    ByteBuffer pollTsBlock();
-
-    boolean isFinished();
-
-    ListenableFuture<Void> isBlocked();
-
-    @Override
-    void close();
+  public DistributedQueryPlan planFragments() {
+    return null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstance.java
similarity index 69%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstance.java
index 91ec40f..0b405b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstance.java
@@ -16,21 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.plan;
 
-import com.google.common.util.concurrent.ListenableFuture;
+public class FragmentInstance {
+  private FragmentInstanceId id;
 
-import java.io.Closeable;
-import java.nio.ByteBuffer;
+  // The reference of PlanFragment which this instance is generated from
+  private PlanFragment fragment;
 
-public interface ExchangeDataSource extends Closeable {
-
-    ByteBuffer pollTsBlock();
-
-    boolean isFinished();
-
-    ListenableFuture<Void> isBlocked();
-
-    @Override
-    void close();
+  // We can add some more params for a specific FragmentInstance
+  // So that we can make different FragmentInstance owns different data range.
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstanceId.java
similarity index 70%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryStateMachine.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstanceId.java
index 10ba210..18181cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstanceId.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,11 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.exec;
+package org.apache.iotdb.db.mpp.plan;
 
-/**
- * State machine for a QueryExecution. It stores the states for the QueryExecution.
- * Others can register listeners when the state changes of the QueryExecution.
- */
-public class QueryStateMachine {
+public class FragmentInstanceId {
+  private String id;
+
+  public FragmentInstanceId(String id) {
+    this.id = id;
+  }
+
+  // A SinkOperator is needed here. So that we can know where the result of this instance can be
+  // sent
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalPlanner.java
similarity index 57%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalPlanner.java
index 91ec40f..a74644b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalPlanner.java
@@ -16,21 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.plan;
 
-import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.db.mpp.common.Analysis;
+import org.apache.iotdb.db.mpp.common.QueryContext;
+import org.apache.iotdb.db.mpp.plan.optimzation.PlanOptimizer;
 
-import java.io.Closeable;
-import java.nio.ByteBuffer;
+import java.util.List;
 
-public interface ExchangeDataSource extends Closeable {
+public class LogicalPlanner {
+  private Analysis analysis;
+  private QueryContext context;
+  private List<PlanOptimizer> optimizers;
 
-    ByteBuffer pollTsBlock();
+  public LogicalPlanner(Analysis analysis, QueryContext context, List<PlanOptimizer> optimizers) {
+    this.analysis = analysis;
+    this.context = context;
+    this.optimizers = optimizers;
+  }
 
-    boolean isFinished();
-
-    ListenableFuture<Void> isBlocked();
-
-    @Override
-    void close();
+  public LogicalQueryPlan plan() {
+    return null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalQueryPlan.java
similarity index 66%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalQueryPlan.java
index 91ec40f..5094df6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalQueryPlan.java
@@ -16,21 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.plan;
 
-import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.db.mpp.common.QueryContext;
+import org.apache.iotdb.db.mpp.common.TsBlock;
+import org.apache.iotdb.db.mpp.plan.node.PlanNode;
 
-import java.io.Closeable;
-import java.nio.ByteBuffer;
-
-public interface ExchangeDataSource extends Closeable {
-
-    ByteBuffer pollTsBlock();
-
-    boolean isFinished();
-
-    ListenableFuture<Void> isBlocked();
-
-    @Override
-    void close();
+/**
+ * LogicalQueryPlan represents a logical query plan. It stores the root node of corresponding query
+ * plan node tree.
+ */
+public class LogicalQueryPlan {
+  private QueryContext context;
+  private PlanNode<TsBlock> rootNode;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragment.java
similarity index 68%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragment.java
index 91ec40f..fc49264 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragment.java
@@ -16,21 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.plan;
 
-import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.db.mpp.common.TsBlock;
+import org.apache.iotdb.db.mpp.plan.node.PlanNode;
 
-import java.io.Closeable;
-import java.nio.ByteBuffer;
-
-public interface ExchangeDataSource extends Closeable {
-
-    ByteBuffer pollTsBlock();
-
-    boolean isFinished();
-
-    ListenableFuture<Void> isBlocked();
-
-    @Override
-    void close();
+// TODO: consider whether it is necessary to make PlanFragment as a TreeNode
+/** PlanFragment contains a sub-query of distributed query. */
+public class PlanFragment {
+  private PlanFragmentId id;
+  private PlanNode<TsBlock> root;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragmentId.java
similarity index 79%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragmentId.java
index eff9fa7..39f8d17 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragmentId.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,7 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.plan;
 
-public class SeriesScanOperator {
+public class PlanFragmentId {
+  private String id;
+
+  public PlanFragmentId(String id) {
+    this.id = id;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNode.java
similarity index 61%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNode.java
index 91ec40f..1a3f103 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNode.java
@@ -16,21 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.plan.node;
 
-import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.db.mpp.common.TreeNode;
 
-import java.io.Closeable;
-import java.nio.ByteBuffer;
-
-public interface ExchangeDataSource extends Closeable {
-
-    ByteBuffer pollTsBlock();
-
-    boolean isFinished();
-
-    ListenableFuture<Void> isBlocked();
+/**
+ * @author xingtanzjr The base class of query executable operators, which is used to compose logical
+ *     query plan. TODO: consider how to restrict the children type for each type of ExecOperator
+ *     TODO: consider to fix the Template type as TsBlock
+ */
+public abstract class PlanNode<T> extends TreeNode<PlanNode<T>> {
+  private PlanNodeId id;
 
-    @Override
-    void close();
+  public PlanNode(PlanNodeId id) {
+    this.id = id;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/PlanNodeId.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeId.java
similarity index 75%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/PlanNodeId.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeId.java
index e694df7..576fd64 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/PlanNodeId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeId.java
@@ -14,21 +14,21 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-
-package org.apache.iotdb.db.query.mpp.plan.node;
+package org.apache.iotdb.db.mpp.plan.node;
 
 public class PlanNodeId {
-    private String id;
-    public PlanNodeId(String id) {
-        this.id = id;
-    }
+  private String id;
+
+  public PlanNodeId(String id) {
+    this.id = id;
+  }
 
-    public String getId() {
-        return this.id;
-    }
+  public String getId() {
+    return this.id;
+  }
 
-    @Override
-    public String toString() {
-        return this.id;
-    }
+  @Override
+  public String toString() {
+    return this.id;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeIdAllocator.java
similarity index 81%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeIdAllocator.java
index eff9fa7..6e70c20 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeIdAllocator.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.plan.node;
 
-public class SeriesScanOperator {
-}
+/** A centralized PlanNodeId generator */
+public class PlanNodeIdAllocator {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/DeviceMergeNode.java
new file mode 100644
index 0000000..9269545
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/DeviceMergeNode.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.node.process;
+
+import org.apache.iotdb.db.mpp.common.OrderBy;
+import org.apache.iotdb.db.mpp.common.TsBlock;
+import org.apache.iotdb.db.mpp.common.WithoutPolicy;
+import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+
+import java.util.Map;
+
+/**
+ * DeviceMergeOperator is responsible for constructing a device-based view of a set of series. And
+ * output the result with specific order. The order could be 'order by device' or 'order by
+ * timestamp'
+ *
+ * <p>Each output from its children should have the same schema. That means, the columns should be
+ * same between these TsBlocks. If the input TsBlock contains n columns, the device-based view will
+ * contain n+1 columns where the new column is Device column.
+ */
+public class DeviceMergeNode extends ProcessNode {
+  // The result output order that this operator
+  private OrderBy mergeOrder;
+
+  // The policy to decide whether a row should be discarded
+  // The without policy is able to be push down to the DeviceMergeNode because we can know whether a
+  // row contains
+  // null or not.
+  private WithoutPolicy withoutPolicy;
+
+  // The map from deviceName to corresponding query result node responsible for that device.
+  // DeviceNode means the node whose output TsBlock contains the data belonged to one device.
+  private Map<String, PlanNode<TsBlock>> childDeviceNodeMap;
+
+  public DeviceMergeNode(PlanNodeId id) {
+    super(id);
+  }
+
+  public DeviceMergeNode(PlanNodeId id, Map<String, PlanNode<TsBlock>> deviceNodeMap) {
+    this(id);
+    this.childDeviceNodeMap = deviceNodeMap;
+    this.children.addAll(deviceNodeMap.values());
+  }
+
+  public void addChildDeviceNode(String deviceName, PlanNode<TsBlock> childNode) {
+    this.childDeviceNodeMap.put(deviceName, childNode);
+    this.children.add(childNode);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FillNode.java
similarity index 61%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FillNode.java
index 91ec40f..31e57cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FillNode.java
@@ -16,21 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.plan.node.process;
 
-import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.db.mpp.common.FillPolicy;
+import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
 
-import java.io.Closeable;
-import java.nio.ByteBuffer;
+/** FillNode is used to fill the empty field in one row. */
+public class FillNode extends ProcessNode {
 
-public interface ExchangeDataSource extends Closeable {
+  // The policy to discard the result from upstream node
+  private FillPolicy fillPolicy;
 
-    ByteBuffer pollTsBlock();
+  public FillNode(PlanNodeId id) {
+    super(id);
+  }
 
-    boolean isFinished();
-
-    ListenableFuture<Void> isBlocked();
-
-    @Override
-    void close();
+  public FillNode(PlanNodeId id, FillPolicy fillPolicy) {
+    this(id);
+    this.fillPolicy = fillPolicy;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNode.java
similarity index 61%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNode.java
index 91ec40f..a4cb88c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNode.java
@@ -16,21 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.plan.node.process;
 
-import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
 
-import java.io.Closeable;
-import java.nio.ByteBuffer;
+/** The FilterNode is responsible to filter the RowRecord from TsBlock. */
+public class FilterNode extends ProcessNode {
 
-public interface ExchangeDataSource extends Closeable {
+  // The filter
+  private FilterOperator rowFilter;
 
-    ByteBuffer pollTsBlock();
+  public FilterNode(PlanNodeId id) {
+    super(id);
+  }
 
-    boolean isFinished();
-
-    ListenableFuture<Void> isBlocked();
-
-    @Override
-    void close();
+  public FilterNode(PlanNodeId id, FilterOperator rowFilter) {
+    this(id);
+    this.rowFilter = rowFilter;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/GroupByLevelNode.java
new file mode 100644
index 0000000..538d6d8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/GroupByLevelNode.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.node.process;
+
+import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+
+/**
+ * This node is responsible for the final aggregation merge operation. It will process the data from
+ * TsBlock row by row. For one row, it will rollup the fields which have the same aggregate function
+ * and belong to one bucket. Here, that two columns belong to one bucket means the partial paths of
+ * device after rolling up in specific level are the same. For example, let's say there are two
+ * columns `root.sg.d1.s1` and `root.sg.d2.s1`. If the group by level parameter is [0, 1], then
+ * these two columns will belong to one bucket and the bucket name is `root.sg.*.s1`. If the group
+ * by level parameter is [0, 2], then these two columns will not belong to one bucket. And the total
+ * buckets are `root.*.d1.s1` and `root.*.d2.s1`
+ */
+public class GroupByLevelNode extends ProcessNode {
+
+  private int[] groupByLevels;
+
+  public GroupByLevelNode(PlanNodeId id, int[] groupByLevels) {
+    super(id);
+    this.groupByLevels = groupByLevels;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/LimitNode.java
similarity index 65%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/LimitNode.java
index 91ec40f..9596c1a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/LimitNode.java
@@ -16,21 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.plan.node.process;
 
-import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
 
-import java.io.Closeable;
-import java.nio.ByteBuffer;
+/** LimitNode is used to select top n result. It uses the default order of upstream nodes */
+public class LimitNode extends ProcessNode {
 
-public interface ExchangeDataSource extends Closeable {
+  // The limit count
+  private int limit;
 
-    ByteBuffer pollTsBlock();
+  public LimitNode(PlanNodeId id) {
+    super(id);
+  }
 
-    boolean isFinished();
-
-    ListenableFuture<Void> isBlocked();
-
-    @Override
-    void close();
+  public LimitNode(PlanNodeId id, int limit) {
+    this(id);
+    this.limit = limit;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/OffsetNode.java
similarity index 63%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/OffsetNode.java
index 91ec40f..01e0e93 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/OffsetNode.java
@@ -16,21 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.plan.node.process;
 
-import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
 
-import java.io.Closeable;
-import java.nio.ByteBuffer;
-
-public interface ExchangeDataSource extends Closeable {
-
-    ByteBuffer pollTsBlock();
+/**
+ * OffsetNode is used to skip top n result from upstream nodes. It uses the default order of
+ * upstream nodes
+ */
+public class OffsetNode extends ProcessNode {
 
-    boolean isFinished();
+  // The limit count
+  private int offset;
 
-    ListenableFuture<Void> isBlocked();
+  public OffsetNode(PlanNodeId id) {
+    super(id);
+  }
 
-    @Override
-    void close();
+  public OffsetNode(PlanNodeId id, int offset) {
+    this(id);
+    this.offset = offset;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
similarity index 69%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
index 91ec40f..63e07b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
@@ -16,21 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.plan.node.process;
 
-import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.db.mpp.common.TsBlock;
+import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
 
-import java.io.Closeable;
-import java.nio.ByteBuffer;
-
-public interface ExchangeDataSource extends Closeable {
-
-    ByteBuffer pollTsBlock();
-
-    boolean isFinished();
-
-    ListenableFuture<Void> isBlocked();
-
-    @Override
-    void close();
+public class ProcessNode extends PlanNode<TsBlock> {
+  public ProcessNode(PlanNodeId id) {
+    super(id);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/RowBasedSeriesAggregateNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/RowBasedSeriesAggregateNode.java
new file mode 100644
index 0000000..9d7b943
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/RowBasedSeriesAggregateNode.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.node.process;
+
+import org.apache.iotdb.db.mpp.common.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
+
+import java.util.List;
+
+/**
+ * This node is used to aggregate required series by raw data. The raw data will be input as a
+ * TsBlock. This node will output the series aggregated result represented by TsBlock Thus, the
+ * columns in output TsBlock will be different from input TsBlock.
+ */
+public class RowBasedSeriesAggregateNode extends ProcessNode {
+  // The parameter of `group by time`
+  // Its value will be null if there is no `group by time` clause,
+  private GroupByTimeParameter groupByTimeParameter;
+
+  // The list of aggregation functions, each FunctionExpression will be output as one column of
+  // result TsBlock
+  // (Currently we only support one series in the aggregation function)
+  // TODO: need consider whether it is suitable the aggregation function using FunctionExpression
+  private List<FunctionExpression> aggregateFuncList;
+
+  public RowBasedSeriesAggregateNode(PlanNodeId id) {
+    super(id);
+  }
+
+  public RowBasedSeriesAggregateNode(PlanNodeId id, List<FunctionExpression> aggregateFuncList) {
+    this(id);
+    this.aggregateFuncList = aggregateFuncList;
+  }
+
+  public RowBasedSeriesAggregateNode(
+      PlanNodeId id,
+      List<FunctionExpression> aggregateFuncList,
+      GroupByTimeParameter groupByTimeParameter) {
+    this(id, aggregateFuncList);
+    this.groupByTimeParameter = groupByTimeParameter;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/OperatorContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/SortNode.java
similarity index 60%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/OperatorContext.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/SortNode.java
index 84c2964..1e83783 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/OperatorContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/SortNode.java
@@ -16,24 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.plan.node.process;
 
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.common.OrderBy;
+import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
 
 /**
- * Contains information about {@link Operator} execution.
- * <p>
- * Not thread-safe.
+ * In general, the parameter in sortNode should be pushed down to the upstream operators. In our
+ * optimized logical query plan, the sortNode should not appear.
  */
-public class OperatorContext {
+public class SortNode extends ProcessNode {
 
-    private final int operatorId;
-    private final PlanNodeId planNodeId;
-    private final String operatorType;
+  private OrderBy sortOrder;
 
-    public OperatorContext(int operatorId, PlanNodeId planNodeId, String operatorType) {
-        this.operatorId = operatorId;
-        this.planNodeId = planNodeId;
-        this.operatorType = operatorType;
-    }
+  public SortNode(PlanNodeId id) {
+    super(id);
+  }
+
+  public SortNode(PlanNodeId id, OrderBy sortOrder) {
+    this(id);
+    this.sortOrder = sortOrder;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/TimeJoinNode.java
new file mode 100644
index 0000000..ab48cc4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/TimeJoinNode.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.node.process;
+
+import org.apache.iotdb.db.mpp.common.OrderBy;
+import org.apache.iotdb.db.mpp.common.TsBlock;
+import org.apache.iotdb.db.mpp.common.WithoutPolicy;
+import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+
+import java.util.Arrays;
+
+/**
+ * TimeJoinOperator is responsible for join two or more TsBlock. The join algorithm is like outer
+ * join by timestamp column. It will join two or more TsBlock by Timestamp column. The output result
+ * of TimeJoinOperator is sorted by timestamp
+ */
+// TODO: define the TimeJoinMergeNode for distributed plan
+public class TimeJoinNode extends ProcessNode {
+
+  // This parameter indicates the order when executing multiway merge sort.
+  private OrderBy mergeOrder;
+
+  // The policy to decide whether a row should be discarded
+  // The without policy is able to be push down to the TimeJoinOperator because we can know whether
+  // a row contains
+  // null or not.
+  private WithoutPolicy withoutPolicy;
+
+  public TimeJoinNode(PlanNodeId id) {
+    super(id);
+    this.mergeOrder = OrderBy.TIMESTAMP_ASC;
+  }
+
+  public TimeJoinNode(PlanNodeId id, PlanNode<TsBlock>... children) {
+    super(id);
+    this.children.addAll(Arrays.asList(children));
+  }
+
+  public void addChild(PlanNode<TsBlock> child) {
+    this.children.add(child);
+  }
+
+  public void setMergeOrder(OrderBy mergeOrder) {
+    this.mergeOrder = mergeOrder;
+  }
+
+  public void setWithoutPolicy(WithoutPolicy withoutPolicy) {
+    this.withoutPolicy = withoutPolicy;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/WithoutNode.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/WithoutNode.java
index 91ec40f..e6365b2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/WithoutNode.java
@@ -16,21 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.plan.node.process;
 
-import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.db.mpp.common.WithoutPolicy;
+import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
 
-import java.io.Closeable;
-import java.nio.ByteBuffer;
+/** WithoutNode is used to discard specific rows from upstream node. */
+public class WithoutNode extends ProcessNode {
 
-public interface ExchangeDataSource extends Closeable {
+  // The policy to discard the result from upstream operator
+  private WithoutPolicy discardPolicy;
 
-    ByteBuffer pollTsBlock();
+  public WithoutNode(PlanNodeId id) {
+    super(id);
+  }
 
-    boolean isFinished();
-
-    ListenableFuture<Void> isBlocked();
-
-    @Override
-    void close();
+  public WithoutNode(PlanNodeId id, WithoutPolicy discardPolicy) {
+    this(id);
+    this.discardPolicy = discardPolicy;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/CsvSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/CsvSinkNode.java
similarity index 83%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/CsvSinkNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/CsvSinkNode.java
index 809ff37..0bac6bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/CsvSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/CsvSinkNode.java
@@ -16,10 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.iotdb.db.mpp.plan.node.sink;
 
-package org.apache.iotdb.db.query.mpp.plan.node.sink;
-
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
 
 public class CsvSinkNode extends SinkNode {
   public CsvSinkNode(PlanNodeId id) {
@@ -27,12 +26,8 @@ public class CsvSinkNode extends SinkNode {
   }
 
   @Override
-  public void close() throws Exception {
-
-  }
+  public void close() throws Exception {}
 
   @Override
-  public void send() {
-
-  }
+  public void send() {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/ThriftSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/FragmentSinkNode.java
similarity index 72%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/ThriftSinkNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/FragmentSinkNode.java
index c71ce9a..58c3a71 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/ThriftSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/FragmentSinkNode.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,25 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.iotdb.db.mpp.plan.node.sink;
 
-package org.apache.iotdb.db.query.mpp.plan.node.sink;
+import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
 
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
-
-/**
- * not implemented in current IoTDB yet
- */
-public class ThriftSinkNode extends SinkNode {
-
-  public ThriftSinkNode(PlanNodeId id) {
+public class FragmentSinkNode extends SinkNode {
+  public FragmentSinkNode(PlanNodeId id) {
     super(id);
   }
 
   @Override
-  public void close() throws Exception {}
+  public void send() {}
 
   @Override
-  public void send() {
-
-  }
+  public void close() throws Exception {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/SinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/SinkNode.java
similarity index 73%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/SinkNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/SinkNode.java
index 31fa7ad..f59effb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/SinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/SinkNode.java
@@ -16,18 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.iotdb.db.mpp.plan.node.sink;
 
-package org.apache.iotdb.db.query.mpp.plan.node.sink;
-
-import org.apache.iotdb.db.query.mpp.common.TsBlock;
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.common.TsBlock;
+import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
 
 public abstract class SinkNode extends PlanNode<TsBlock> implements AutoCloseable {
 
-    public SinkNode(PlanNodeId id) {
-        super(id);
-    }
+  public SinkNode(PlanNodeId id) {
+    super(id);
+  }
 
-    public abstract void send();
+  public abstract void send();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/ThriftSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/ThriftSinkNode.java
similarity index 83%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/ThriftSinkNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/ThriftSinkNode.java
index c71ce9a..f5c48df 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/ThriftSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/ThriftSinkNode.java
@@ -16,14 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.iotdb.db.mpp.plan.node.sink;
 
-package org.apache.iotdb.db.query.mpp.plan.node.sink;
+import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
 
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
-
-/**
- * not implemented in current IoTDB yet
- */
+/** not implemented in current IoTDB yet */
 public class ThriftSinkNode extends SinkNode {
 
   public ThriftSinkNode(PlanNodeId id) {
@@ -34,7 +31,5 @@ public class ThriftSinkNode extends SinkNode {
   public void close() throws Exception {}
 
   @Override
-  public void send() {
-
-  }
+  public void send() {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/CsvSourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/CsvSourceNode.java
similarity index 86%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/CsvSourceNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/CsvSourceNode.java
index 5d76beb..a2a0fde 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/CsvSourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/CsvSourceNode.java
@@ -16,13 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.plan.node.source;
+package org.apache.iotdb.db.mpp.plan.node.source;
 
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
 
-/**
- * Not implemented in current version.
- */
+/** Not implemented in current version. */
 public class CsvSourceNode extends SourceNode {
 
   public CsvSourceNode(PlanNodeId id) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesAggregateNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesAggregateNode.java
new file mode 100644
index 0000000..80ea58f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesAggregateNode.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.node.source;
+
+import org.apache.iotdb.db.mpp.common.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+/**
+ * SeriesAggregateOperator is responsible to do the aggregation calculation for one series. It will
+ * read the target series and calculate the aggregation result by the aggregation digest or raw data
+ * of this series.
+ *
+ * <p>The aggregation result will be represented as a TsBlock
+ *
+ * <p>This operator will split data of the target series into many groups by time range and do the
+ * aggregation calculation for each group. Each result will be one row of the result TsBlock. The
+ * timestamp of each row is the start time of the time range group.
+ *
+ * <p>If there is no time range split parameter, the result TsBlock will only contain one row, which
+ * represent the whole aggregation result of this series. And the timestamp will be 0, which is
+ * meaningless.
+ */
+public class SeriesAggregateNode extends SourceNode {
+
+  // The parameter of `group by time`
+  // Its value will be null if there is no `group by time` clause,
+  private GroupByTimeParameter groupByTimeParameter;
+
+  // The aggregation function, which contains the function name and related series.
+  // (Currently we only support one series in the aggregation function)
+  // TODO: need consider whether it is suitable the aggregation function using FunctionExpression
+  private FunctionExpression aggregateFunc;
+
+  private Filter filter;
+
+  public SeriesAggregateNode(PlanNodeId id) {
+    super(id);
+  }
+
+  public SeriesAggregateNode(PlanNodeId id, FunctionExpression aggregateFunc) {
+    this(id);
+    this.aggregateFunc = aggregateFunc;
+  }
+
+  public SeriesAggregateNode(
+      PlanNodeId id, FunctionExpression aggregateFunc, GroupByTimeParameter groupByTimeParameter) {
+    this(id, aggregateFunc);
+    this.groupByTimeParameter = groupByTimeParameter;
+  }
+
+  @Override
+  public void open() throws Exception {}
+
+  @Override
+  public void close() throws Exception {}
+
+  // This method is used when do the PredicatePushDown.
+  // The filter is not put in the constructor because the filter is only clear in the predicate
+  // push-down stage
+  public void setFilter(Filter filter) {
+    this.filter = filter;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesScanNode.java
similarity index 86%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SeriesScanNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesScanNode.java
index 4b6a187..ccfae5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesScanNode.java
@@ -16,19 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
-package org.apache.iotdb.db.query.mpp.plan.node.source;
+package org.apache.iotdb.db.mpp.plan.node.source;
 
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.query.mpp.common.OrderBy;
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
-import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.db.mpp.common.OrderBy;
+import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 /**
- * SeriesScanOperator is responsible for read data a specific series. When reading data, the SeriesScanOperator
- * can read the raw data batch by batch. And also, it can leverage the filter and other info to decrease the
- * result set.
+ * SeriesScanOperator is responsible for read data a specific series. When reading data, the
+ * SeriesScanOperator can read the raw data batch by batch. And also, it can leverage the filter and
+ * other info to decrease the result set.
  *
  * <p>Children type: no child is allowed for SeriesScanNode
  */
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SourceNode.java
similarity index 79%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SourceNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SourceNode.java
index 9ee8ff7..c83da97 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SourceNode.java
@@ -16,14 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.iotdb.db.mpp.plan.node.source;
 
-package org.apache.iotdb.db.query.mpp.plan.node.source;
+import org.apache.iotdb.db.mpp.common.TsBlock;
+import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
 
-import org.apache.iotdb.db.query.mpp.common.TsBlock;
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
-
-public abstract class SourceNode extends PlanNode<TsBlock> implements AutoCloseable{
+public abstract class SourceNode extends PlanNode<TsBlock> implements AutoCloseable {
 
   public SourceNode(PlanNodeId id) {
     super(id);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/optimzation/PlanOptimizer.java
similarity index 69%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/optimzation/PlanOptimizer.java
index 91ec40f..b99c5db 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/optimzation/PlanOptimizer.java
@@ -16,21 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.mpp.operator;
+package org.apache.iotdb.db.mpp.plan.optimzation;
 
-import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.db.mpp.common.QueryContext;
+import org.apache.iotdb.db.mpp.common.TsBlock;
+import org.apache.iotdb.db.mpp.plan.node.PlanNode;
 
-import java.io.Closeable;
-import java.nio.ByteBuffer;
-
-public interface ExchangeDataSource extends Closeable {
-
-    ByteBuffer pollTsBlock();
-
-    boolean isFinished();
-
-    ListenableFuture<Void> isBlocked();
-
-    @Override
-    void close();
+public interface PlanOptimizer {
+  PlanNode<TsBlock> optimize(PlanNode<TsBlock> plan, QueryContext context);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/common/Analysis.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/common/Analysis.java
deleted file mode 100644
index e003b0f..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/common/Analysis.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package org.apache.iotdb.db.query.mpp.common;
-
-/**
- * Analysis used for planning a query.
- * TODO: This class may need to store more info for a query.
- */
-public class Analysis {
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/common/FillPolicy.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/common/FillPolicy.java
deleted file mode 100644
index aa272cc..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/common/FillPolicy.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package org.apache.iotdb.db.query.mpp.common;
-
-public enum FillPolicy {
-    PREVIOUS,
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/common/GroupByTimeParameter.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/common/GroupByTimeParameter.java
deleted file mode 100644
index 59b255c..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/common/GroupByTimeParameter.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package org.apache.iotdb.db.query.mpp.common;
-
-import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
-
-/**
- * In single-node IoTDB, the GroupByTimePlan is used to represent the parameter of `group by time`.
- * To avoid ambiguity, we use another name `GroupByTimeParameter` here
- */
-public class GroupByTimeParameter extends GroupByTimePlan {
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/common/OrderBy.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/common/OrderBy.java
deleted file mode 100644
index 22dbd03..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/common/OrderBy.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package org.apache.iotdb.db.query.mpp.common;
-
-/**
- * The traversal order for operators by timestamp
- */
-public enum OrderBy {
-    TIMESTAMP_ASC,
-    TIMESTAMP_DESC,
-    DEVICE_NAME_ASC,
-    DEVICE_NAME_DESC,
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/common/QueryContext.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/common/QueryContext.java
deleted file mode 100644
index 7f8ba05..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/common/QueryContext.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package org.apache.iotdb.db.query.mpp.common;
-
-/**
- * This class is used to record the context of a query including QueryId, query statement, session info and so on
- */
-public class QueryContext {
-    private String statement;
-    private QueryId queryId;
-    private QuerySession session;
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/common/QueryId.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/common/QueryId.java
deleted file mode 100644
index 6fdc292..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/common/QueryId.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.apache.iotdb.db.query.mpp.common;
-
-public class QueryId {
-    private String Id;
-
-    public String getId() {
-        return Id;
-    }
-
-    public void setId(String id) {
-        Id = id;
-    }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/common/QuerySession.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/common/QuerySession.java
deleted file mode 100644
index 0fb8232..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/common/QuerySession.java
+++ /dev/null
@@ -1,4 +0,0 @@
-package org.apache.iotdb.db.query.mpp.common;
-
-public class QuerySession {
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/common/TreeNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/common/TreeNode.java
deleted file mode 100644
index ec6f664..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/common/TreeNode.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.iotdb.db.query.mpp.common;
-
-import java.util.List;
-
-/**
- * @author A simple class to describe the tree style structure of query executable operators
- * @param <T>
- */
-public class TreeNode<T extends TreeNode<T>> {
-    protected List<T> children;
-
-    public T getChild(int i) {
-        return hasChild(i) ? children.get(i) : null;
-    }
-
-    public boolean hasChild(int i) {
-        return children.size() > i;
-    }
-
-    public void addChild(T n) {
-        children.add(n);
-    }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/common/TsBlock.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/common/TsBlock.java
deleted file mode 100644
index cb8701f..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/common/TsBlock.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.apache.iotdb.db.query.mpp.common;
-
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-
-/**
- * Intermediate result for most of ExecOperators.
- * The Tablet contains data from one or more columns and constructs them as a row based view
- * The columns can be series, aggregation result for one series or scalar value (such as deviceName).
- * The Tablet also contains the metadata to describe the columns.
- *
- * TODO: consider the detailed data store model in memory. (using column based or row based ?)
- */
-public class TsBlock {
-
-    // Describe the column info
-    private TsBlockMetadata metadata;
-
-    public boolean hasNext() {
-        return false;
-    }
-
-    // Get next row in current tablet
-    public RowRecord getNext() {
-        return null;
-    }
-
-    public TsBlockMetadata getMetadata() {
-        return metadata;
-    }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/common/TsBlockMetadata.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/common/TsBlockMetadata.java
deleted file mode 100644
index 21ee1c6..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/common/TsBlockMetadata.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.apache.iotdb.db.query.mpp.common;
-
-import java.util.List;
-
-public class TsBlockMetadata {
-    // list of all columns in current Tablet
-    // The column list not only contains the series column, but also contains other column to construct the final result
-    // set such as timestamp and deviceName
-    private List<String> columnList;
-
-    // Indicate whether the result set should be aligned by device. This parameter can be used for downstream operators
-    // when processing data from current Tablet. The RowRecord produced by Tablet with `alignedByDevice = true` will contain
-    // n + 1 fields which are n series field and 1 deviceName field.
-    // For example, when the FilterOperator execute the filter operation, it may need the deviceName field when matching
-    // the series with corresponding column in Tablet
-    //
-    // If alignedByDevice is true, the owned series should belong to one device
-    private boolean alignedByDevice;
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/common/WithoutPolicy.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/common/WithoutPolicy.java
deleted file mode 100644
index c89ac6f..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/common/WithoutPolicy.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package org.apache.iotdb.db.query.mpp.common;
-
-public enum WithoutPolicy {
-    CONTAINS_NULL,
-    ALL_NULL
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/ExecFragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/ExecFragmentInstance.java
deleted file mode 100644
index a4333b8..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/ExecFragmentInstance.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.query.mpp.exec;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-import java.io.Closeable;
-import io.airlift.units.Duration;
-
-/**
- * ExecutableFragmentInstance encapsulates some methods which are necessary for execution scheduler to run a fragment instance
- */
-public interface ExecFragmentInstance extends Closeable {
-
-    /**
-     * Used to judge whether this fragment instance has any more data to process
-     *
-     * @return true if the FragmentInstance is done, otherwise false.
-     */
-    boolean isFinished();
-
-    /**
-     * run the fragment instance for {@param duration} time slice, the time of this run is likely not to be equal to {@param duration},
-     * the actual run time should be calculated by the caller
-     *
-     * @param duration how long should this fragment instance run
-     * @return the returned ListenableFuture<Void> is used to represent status of this processing
-     *         if isDone() return true, meaning that this fragment instance is not blocked and is ready for next processing
-     *         otherwise, meaning that this fragment instance is blocked and not ready for next processing.
-     */
-    ListenableFuture<Void> processFor(Duration duration);
-
-    /**
-     * @return the information about this Fragment Instance in String format
-     */
-    String getInfo();
-
-    /**
-     * clear resource used by this fragment instance
-     */
-    @Override
-    void close();
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryExecution.java
deleted file mode 100644
index f046f12..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryExecution.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.query.mpp.exec;
-
-import org.apache.iotdb.db.query.mpp.common.Analysis;
-import org.apache.iotdb.db.query.mpp.common.QueryContext;
-import org.apache.iotdb.db.query.mpp.common.QueryId;
-import org.apache.iotdb.db.query.mpp.plan.*;
-import org.apache.iotdb.db.query.mpp.plan.optimzation.PlanOptimizer;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-/**
- * QueryExecution stores all the status of a query which is being prepared or running inside the MPP frame.
- * It takes three main responsibilities:
- *      1. Prepare a query. Transform a query from statement to DistributedQueryPlan with fragment instances.
- *      2. Dispatch all the fragment instances to corresponding physical nodes.
- *      3. Collect and monitor the progress/states of this query.
- */
-public class QueryExecution {
-    private QueryContext context;
-    private QueryScheduler scheduler;
-    private QueryStateMachine stateMachine;
-
-    private List<PlanOptimizer> planOptimizers;
-
-    private Analysis analysis;
-    private LogicalQueryPlan logicalPlan;
-    private DistributedQueryPlan distributedPlan;
-    private List<PlanFragment> fragments;
-    private List<FragmentInstance> fragmentInstances;
-
-    public QueryExecution(QueryContext context) {
-        this.context = context;
-    }
-
-    public void plan() {
-        analyze();
-        doLogicalPlan();
-        doDistributedPlan();
-        planFragmentInstances();
-    }
-
-    public void schedule() {
-        this.scheduler = new QueryScheduler(this.stateMachine, this.fragmentInstances);
-        this.scheduler.start();
-    }
-
-    // Analyze the statement in QueryContext. Generate the analysis this query need
-    public void analyze() {
-        // initialize the variable `analysis`
-
-    }
-
-    // Use LogicalPlanner to do the logical query plan and logical optimization
-    public void doLogicalPlan() {
-        LogicalPlanner planner = new LogicalPlanner(this.analysis, this.context, this.planOptimizers);
-        this.logicalPlan = planner.plan();
-    }
-
-    // Generate the distributed plan and split it into fragments
-    public void doDistributedPlan() {
-        DistributionPlanner planner = new DistributionPlanner(this.analysis, this.logicalPlan);
-        this.distributedPlan = planner.planFragments();
-
-    }
-
-    // Convert fragment to detailed instance
-    // And for parallel-able fragment, clone it into several instances with different params.
-    public void planFragmentInstances() {
-
-    }
-
-    /**
-     * This method will be called by the request thread from client connection.
-     * This method will block until one of these conditions occurs:
-     *   1. There is a batch of result
-     *   2. There is no more result
-     *   3. The query has been cancelled
-     *   4. The query is timeout
-     * This method will fetch the result from DataStreamManager use the virtual ResultOperator's ID
-     * (This part will be designed and implemented with DataStreamManager)
-     */
-    public ByteBuffer getBatchResult() {
-        return null;
-    }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryScheduler.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryScheduler.java
deleted file mode 100644
index e511abe..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryScheduler.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.query.mpp.exec;
-
-import org.apache.iotdb.db.query.mpp.plan.FragmentInstance;
-
-import java.util.List;
-
-/**
- * QueryScheduler is used to dispatch the fragment instances of a query to target nodes. And it will continue to
- * collect and monitor the query execution before the query is finished.
- *
- * Later, we can add more control logic for a QueryExecution such as retry, kill and so on by this scheduler.
- */
-public class QueryScheduler {
-    //The stateMachine of the QueryExecution owned by this QueryScheduler
-    private QueryStateMachine stateMachine;
-
-    // The fragment instances which should be sent to corresponding Nodes.
-    private List<FragmentInstance> instances;
-
-    public QueryScheduler(QueryStateMachine stateMachine, List<FragmentInstance> instances) {
-        this.stateMachine = stateMachine;
-        this.instances = instances;
-    }
-
-    public void start() {
-
-    }
-
-    // Send the instances to other nodes
-    private void sendFragmentInstances() {
-
-    }
-
-    // After sending, start to collect the states of these fragment instances
-    private void startMonitorInstances() {
-
-    }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/Operator.java
deleted file mode 100644
index 5dd28c8..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/Operator.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.query.mpp.operator;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.iotdb.db.query.mpp.common.TsBlock;
-
-import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
-
-public interface Operator extends AutoCloseable {
-    ListenableFuture<Void> NOT_BLOCKED = immediateVoidFuture();
-
-
-    OperatorContext getOperatorContext();
-
-    /**
-     * Returns a future that will be completed when the operator becomes
-     * unblocked.  If the operator is not blocked, this method should return
-     * {@code NOT_BLOCKED}.
-     */
-    default ListenableFuture<Void> isBlocked()
-    {
-        return NOT_BLOCKED;
-    }
-
-    /**
-     * Returns true if and only if this operator can accept an input page.
-     */
-    boolean needsInput();
-
-    /**
-     * Adds an input page to the operator.  This method will only be called if
-     * {@code needsInput()} returns true.
-     */
-    void addInput(TsBlock page);
-
-    /**
-     * Gets an output page from the operator.  If no output data is currently
-     * available, return null.
-     */
-    TsBlock getOutput();
-
-    /**
-     * After calling this method operator should revoke all reserved revocable memory.
-     * As soon as memory is revoked returned future should be marked as done.
-     * <p>
-     * Spawned threads cannot modify OperatorContext because it's not thread safe.
-     * For this purpose implement {@link #finishMemoryRevoke()}
-     * <p>
-     * Since memory revoking signal is delivered asynchronously to the Operator, implementation
-     * must gracefully handle the case when there no longer is any revocable memory allocated.
-     * <p>
-     * After this method is called on Operator the Driver is disallowed to call any
-     * processing methods on it (isBlocked/needsInput/addInput/getOutput) until
-     * {@link #finishMemoryRevoke()} is called.
-     */
-    default ListenableFuture<Void> startMemoryRevoke()
-    {
-        return NOT_BLOCKED;
-    }
-
-    /**
-     * Clean up and release resources after completed memory revoking. Called by driver
-     * once future returned by startMemoryRevoke is completed.
-     */
-    default void finishMemoryRevoke() {}
-
-    /**
-     * Notifies the operator that no more pages will be added and the
-     * operator should finish processing and flush results. This method
-     * will not be called if the Task is already failed or canceled.
-     */
-    void finish();
-
-    /**
-     * Is this operator completely finished processing and no more
-     * output pages will be produced.
-     */
-    boolean isFinished();
-
-    /**
-     * This method will always be called before releasing the Operator reference.
-     */
-    @Override
-    default void close() throws Exception {}
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/DistributedQueryPlan.java
deleted file mode 100644
index 2e3bdfc..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/DistributedQueryPlan.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package org.apache.iotdb.db.query.mpp.plan;
-
-import org.apache.iotdb.db.query.mpp.common.QueryContext;
-import org.apache.iotdb.db.query.mpp.common.TsBlock;
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNode;
-
-import java.util.List;
-
-public class DistributedQueryPlan {
-    private QueryContext context;
-    private PlanNode<TsBlock> rootNode;
-    private PlanFragment rootFragment;
-
-    //TODO: consider whether this field is necessary when do the implementation
-    private List<PlanFragment> fragments;
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/DistributionPlanner.java
deleted file mode 100644
index bf75674..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/DistributionPlanner.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package org.apache.iotdb.db.query.mpp.plan;
-
-import org.apache.iotdb.db.query.mpp.common.Analysis;
-
-public class DistributionPlanner {
-    private Analysis analysis;
-    private LogicalQueryPlan logicalPlan;
-
-    public DistributionPlanner(Analysis analysis, LogicalQueryPlan logicalPlan) {
-        this.analysis = analysis;
-        this.logicalPlan = logicalPlan;
-    }
-
-    public DistributedQueryPlan planFragments() {
-        return null;
-    }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/FragmentInstance.java
deleted file mode 100644
index 0d31f31..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/FragmentInstance.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package org.apache.iotdb.db.query.mpp.plan;
-
-public class FragmentInstance {
-    private FragmentInstanceId id;
-
-    // The reference of PlanFragment which this instance is generated from
-    private PlanFragment fragment;
-
-    // We can add some more params for a specific FragmentInstance
-    // So that we can make different FragmentInstance owns different data range.
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/FragmentInstanceId.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/FragmentInstanceId.java
deleted file mode 100644
index ee6b0b2..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/FragmentInstanceId.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package org.apache.iotdb.db.query.mpp.plan;
-
-public class FragmentInstanceId {
-    private String id;
-    public FragmentInstanceId(String id) {
-        this.id = id;
-    }
-
-    //A SinkOperator is needed here. So that we can know where the result of this instance can be sent
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/LogicalPlanner.java
deleted file mode 100644
index a400979..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/LogicalPlanner.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.iotdb.db.query.mpp.plan;
-
-import org.apache.iotdb.db.query.mpp.common.Analysis;
-import org.apache.iotdb.db.query.mpp.common.QueryContext;
-import org.apache.iotdb.db.query.mpp.plan.optimzation.PlanOptimizer;
-
-import java.util.List;
-
-public class LogicalPlanner {
-    private Analysis analysis;
-    private QueryContext context;
-    private List<PlanOptimizer> optimizers;
-
-    public LogicalPlanner(Analysis analysis, QueryContext context, List<PlanOptimizer> optimizers) {
-        this.analysis = analysis;
-        this.context = context;
-        this.optimizers = optimizers;
-    }
-    
-    public LogicalQueryPlan plan() {
-        return null;
-    }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/LogicalQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/LogicalQueryPlan.java
deleted file mode 100644
index 7820fbd..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/LogicalQueryPlan.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.apache.iotdb.db.query.mpp.plan;
-
-import org.apache.iotdb.db.query.mpp.common.QueryContext;
-import org.apache.iotdb.db.query.mpp.common.TsBlock;
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNode;
-
-/**
- * LogicalQueryPlan represents a logical query plan. It stores the root node of corresponding query plan node tree.
- */
-public class LogicalQueryPlan {
-    private QueryContext context;
-    private PlanNode<TsBlock> rootNode;
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/PlanFragment.java
deleted file mode 100644
index 2ac1592..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/PlanFragment.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.apache.iotdb.db.query.mpp.plan;
-
-import org.apache.iotdb.db.query.mpp.common.TsBlock;
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNode;
-
-//TODO: consider whether it is necessary to make PlanFragment as a TreeNode
-/**
- * PlanFragment contains a sub-query of distributed query.
- */
-public class PlanFragment {
-    private PlanFragmentId id;
-    private PlanNode<TsBlock> root;
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/PlanFragmentId.java
deleted file mode 100644
index 944b620..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/PlanFragmentId.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package org.apache.iotdb.db.query.mpp.plan;
-
-public class PlanFragmentId {
-    private String id;
-    public PlanFragmentId(String id) {
-        this.id = id;
-    }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/PlanNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/PlanNode.java
deleted file mode 100644
index 5b4a62f..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/PlanNode.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package org.apache.iotdb.db.query.mpp.plan.node;
-
-
-import org.apache.iotdb.db.query.mpp.common.TreeNode;
-
-/**
- * @author xingtanzjr
- * The base class of query executable operators, which is used to compose logical query plan.
- * TODO: consider how to restrict the children type for each type of ExecOperator
- * TODO: consider to fix the Template type as TsBlock
- */
-public abstract class PlanNode<T> extends TreeNode<PlanNode<T>> {
-    private PlanNodeId id;
-    public PlanNode(PlanNodeId id) {
-        this.id = id;
-    }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/PlanNodeIdAllocator.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/PlanNodeIdAllocator.java
deleted file mode 100644
index 738e202..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/PlanNodeIdAllocator.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package org.apache.iotdb.db.query.mpp.plan.node;
-
-/**
- * A centralized PlanNodeId generator
- */
-public class PlanNodeIdAllocator {
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/DeviceMergeNode.java
deleted file mode 100644
index 91dac51..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/DeviceMergeNode.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package org.apache.iotdb.db.query.mpp.plan.node.process;
-
-import org.apache.iotdb.db.query.mpp.common.OrderBy;
-import org.apache.iotdb.db.query.mpp.common.TsBlock;
-import org.apache.iotdb.db.query.mpp.common.WithoutPolicy;
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
-
-import java.util.Map;
-
-/**
- * DeviceMergeOperator is responsible for constructing a device-based view of a set of series. And output the result with
- * specific order. The order could be 'order by device' or 'order by timestamp'
- *
- * Each output from its children should have the same schema. That means, the columns should be same between these TsBlocks.
- * If the input TsBlock contains n columns, the device-based view will contain n+1 columns where the new column is Device
- * column.
- *
- */
-public class DeviceMergeNode extends ProcessNode {
-    // The result output order that this operator
-    private OrderBy mergeOrder;
-
-    // The policy to decide whether a row should be discarded
-    // The without policy is able to be push down to the DeviceMergeNode because we can know whether a row contains
-    // null or not.
-    private WithoutPolicy withoutPolicy;
-
-    // The map from deviceName to corresponding query result node responsible for that device.
-    // DeviceNode means the node whose output TsBlock contains the data belonged to one device.
-    private Map<String, PlanNode<TsBlock>> childDeviceNodeMap;
-
-    public DeviceMergeNode(PlanNodeId id) {
-        super(id);
-    }
-
-    public DeviceMergeNode(PlanNodeId id, Map<String, PlanNode<TsBlock>> deviceNodeMap) {
-        this(id);
-        this.childDeviceNodeMap = deviceNodeMap;
-        this.children.addAll(deviceNodeMap.values());
-    }
-
-    public void addChildDeviceNode(String deviceName, PlanNode<TsBlock> childNode) {
-        this.childDeviceNodeMap.put(deviceName, childNode);
-        this.children.add(childNode);
-    }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/FillNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/FillNode.java
deleted file mode 100644
index 08cfd61..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/FillNode.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.iotdb.db.query.mpp.plan.node.process;
-
-import org.apache.iotdb.db.query.mpp.common.FillPolicy;
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
-
-/**
- * FillNode is used to fill the empty field in one row.
- *
- */
-public class FillNode extends ProcessNode {
-
-    // The policy to discard the result from upstream node
-    private FillPolicy fillPolicy;
-
-    public FillNode(PlanNodeId id) {
-        super(id);
-    }
-
-    public FillNode(PlanNodeId id, FillPolicy fillPolicy) {
-        this(id);
-        this.fillPolicy = fillPolicy;
-    }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/FilterNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/FilterNode.java
deleted file mode 100644
index 6a5641d..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/FilterNode.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package org.apache.iotdb.db.query.mpp.plan.node.process;
-
-import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
-
-/**
- * The FilterNode is responsible to filter the RowRecord from TsBlock.
- */
-public class FilterNode extends ProcessNode {
-
-    // The filter
-    private FilterOperator rowFilter;
-
-    public FilterNode(PlanNodeId id) {
-        super(id);
-    }
-
-    public FilterNode(PlanNodeId id, FilterOperator rowFilter) {
-        this(id);
-        this.rowFilter = rowFilter;
-    }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/GroupByLevelNode.java
deleted file mode 100644
index b9d83ca..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/GroupByLevelNode.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.apache.iotdb.db.query.mpp.plan.node.process;
-
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
-
-/**
- * This node is responsible for the final aggregation merge operation.
- * It will process the data from TsBlock row by row.
- * For one row, it will rollup the fields which have the same aggregate function and belong to one bucket.
- * Here, that two columns belong to one bucket means the partial paths of device after rolling up in specific level
- * are the same.
- * For example, let's say there are two columns `root.sg.d1.s1` and `root.sg.d2.s1`.
- * If the group by level parameter is [0, 1], then these two columns will belong to one bucket and the bucket name
- * is `root.sg.*.s1`.
- * If the group by level parameter is [0, 2], then these two columns will not belong to one bucket. And the total buckets
- * are `root.*.d1.s1` and `root.*.d2.s1`
- */
-public class GroupByLevelNode extends ProcessNode {
-
-    private int[] groupByLevels;
-
-    public GroupByLevelNode(PlanNodeId id, int[] groupByLevels) {
-        super(id);
-        this.groupByLevels = groupByLevels;
-    }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/LimitNode.java
deleted file mode 100644
index 35c1d2b..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/LimitNode.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package org.apache.iotdb.db.query.mpp.plan.node.process;
-
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
-
-/**
- * LimitNode is used to select top n result. It uses the default order of upstream nodes
- *
- */
-public class LimitNode extends ProcessNode {
-
-    // The limit count
-    private int limit;
-
-    public LimitNode(PlanNodeId id) {
-        super(id);
-    }
-
-    public LimitNode(PlanNodeId id, int limit) {
-        this(id);
-        this.limit = limit;
-    }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/OffsetNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/OffsetNode.java
deleted file mode 100644
index e70a766..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/OffsetNode.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package org.apache.iotdb.db.query.mpp.plan.node.process;
-
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
-
-/**
- * OffsetNode is used to skip top n result from upstream nodes. It uses the default order of upstream nodes
- *
- */
-public class OffsetNode extends ProcessNode {
-
-    // The limit count
-    private int offset;
-
-    public OffsetNode(PlanNodeId id) {
-        super(id);
-    }
-
-    public OffsetNode(PlanNodeId id, int offset) {
-        this(id);
-        this.offset = offset;
-    }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/ProcessNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/ProcessNode.java
deleted file mode 100644
index 17ce248..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/ProcessNode.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package org.apache.iotdb.db.query.mpp.plan.node.process;
-
-import org.apache.iotdb.db.query.mpp.common.TsBlock;
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
-
-public class ProcessNode extends PlanNode<TsBlock> {
-    public ProcessNode(PlanNodeId id) {
-        super(id);
-    }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/RowBasedSeriesAggregateNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/RowBasedSeriesAggregateNode.java
deleted file mode 100644
index 18b012e..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/RowBasedSeriesAggregateNode.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package org.apache.iotdb.db.query.mpp.plan.node.process;
-
-import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
-import org.apache.iotdb.db.query.mpp.common.GroupByTimeParameter;
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
-
-import java.util.List;
-
-/**
- * This node is used to aggregate required series by raw data.
- * The raw data will be input as a TsBlock. This node will output the series aggregated result represented by TsBlock
- * Thus, the columns in output TsBlock will be different from input TsBlock.
- */
-public class RowBasedSeriesAggregateNode extends ProcessNode {
-    // The parameter of `group by time`
-    // Its value will be null if there is no `group by time` clause,
-    private GroupByTimeParameter groupByTimeParameter;
-
-    // The list of aggregation functions, each FunctionExpression will be output as one column of result TsBlock
-    // (Currently we only support one series in the aggregation function)
-    // TODO: need consider whether it is suitable the aggregation function using FunctionExpression
-    private List<FunctionExpression> aggregateFuncList;
-
-    public RowBasedSeriesAggregateNode(PlanNodeId id) {
-        super(id);
-    }
-
-    public RowBasedSeriesAggregateNode(PlanNodeId id, List<FunctionExpression> aggregateFuncList) {
-        this(id);
-        this.aggregateFuncList = aggregateFuncList;
-    }
-
-    public RowBasedSeriesAggregateNode(PlanNodeId id, List<FunctionExpression> aggregateFuncList, GroupByTimeParameter groupByTimeParameter) {
-        this(id, aggregateFuncList);
-        this.groupByTimeParameter = groupByTimeParameter;
-    }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/SortNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/SortNode.java
deleted file mode 100644
index 430df2f..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/SortNode.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package org.apache.iotdb.db.query.mpp.plan.node.process;
-
-import org.apache.iotdb.db.query.mpp.common.OrderBy;
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
-
-/**
- * In general, the parameter in sortNode should be pushed down to the upstream operators.
- * In our optimized logical query plan, the sortNode should not appear.
- */
-public class SortNode extends ProcessNode {
-
-    private OrderBy sortOrder;
-
-    public SortNode(PlanNodeId id) {
-        super(id);
-    }
-
-    public SortNode(PlanNodeId id, OrderBy sortOrder) {
-        this(id);
-        this.sortOrder = sortOrder;
-    }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/TimeJoinNode.java
deleted file mode 100644
index c37704a..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/TimeJoinNode.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.apache.iotdb.db.query.mpp.plan.node.process;
-
-import org.apache.iotdb.db.query.mpp.common.OrderBy;
-import org.apache.iotdb.db.query.mpp.common.TsBlock;
-import org.apache.iotdb.db.query.mpp.common.WithoutPolicy;
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
-
-import java.util.Arrays;
-
-/**
- * TimeJoinOperator is responsible for join two or more TsBlock.
- * The join algorithm is like outer join by timestamp column. It will join two or more TsBlock by Timestamp column.
- * The output result of TimeJoinOperator is sorted by timestamp
- */
-//TODO: define the TimeJoinMergeNode for distributed plan
-public class TimeJoinNode extends ProcessNode {
-
-    // This parameter indicates the order when executing multiway merge sort.
-    private OrderBy mergeOrder;
-
-    // The policy to decide whether a row should be discarded
-    // The without policy is able to be push down to the TimeJoinOperator because we can know whether a row contains
-    // null or not.
-    private WithoutPolicy withoutPolicy;
-
-    public TimeJoinNode(PlanNodeId id) {
-        super(id);
-        this.mergeOrder = OrderBy.TIMESTAMP_ASC;
-    }
-
-    public TimeJoinNode(PlanNodeId id, PlanNode<TsBlock>... children) {
-        super(id);
-        this.children.addAll(Arrays.asList(children));
-    }
-
-    public void addChild(PlanNode<TsBlock> child) {
-        this.children.add(child);
-    }
-
-    public void setMergeOrder(OrderBy mergeOrder) {
-        this.mergeOrder = mergeOrder;
-    }
-
-    public void setWithoutPolicy(WithoutPolicy withoutPolicy) {
-        this.withoutPolicy = withoutPolicy;
-    }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/WithoutNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/WithoutNode.java
deleted file mode 100644
index 04b0380..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/WithoutNode.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package org.apache.iotdb.db.query.mpp.plan.node.process;
-
-import org.apache.iotdb.db.query.mpp.common.WithoutPolicy;
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
-
-/**
- * WithoutNode is used to discard specific rows from upstream node.
- */
-public class WithoutNode extends ProcessNode {
-
-    // The policy to discard the result from upstream operator
-    private WithoutPolicy discardPolicy;
-
-    public WithoutNode(PlanNodeId id) {
-        super(id);
-    }
-
-    public WithoutNode(PlanNodeId id, WithoutPolicy discardPolicy) {
-        this(id);
-        this.discardPolicy = discardPolicy;
-    }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/FragmentSinkNode.java
deleted file mode 100644
index ce0bd3c..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/FragmentSinkNode.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.apache.iotdb.db.query.mpp.plan.node.sink;
-
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
-
-public class FragmentSinkNode extends SinkNode {
-    public FragmentSinkNode(PlanNodeId id) {
-        super(id);
-    }
-
-    @Override
-    public void send() {
-
-    }
-
-    @Override
-    public void close() throws Exception {
-
-    }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SeriesAggregateNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SeriesAggregateNode.java
deleted file mode 100644
index ec849f0..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SeriesAggregateNode.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.query.mpp.plan.node.source;
-
-import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
-import org.apache.iotdb.db.query.mpp.common.GroupByTimeParameter;
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-
-/**
- * SeriesAggregateOperator is responsible to do the aggregation calculation for one series. It will read the
- * target series and calculate the aggregation result by the aggregation digest or raw data of this series.
- *
- * The aggregation result will be represented as a TsBlock
- *
- * This operator will split data of the target series into many groups by time range and do the aggregation calculation
- * for each group. Each result will be one row of the result TsBlock. The timestamp of each row is the start time of the
- * time range group.
- *
- * If there is no time range split parameter, the result TsBlock will only contain one row, which represent the whole
- * aggregation result of this series. And the timestamp will be 0, which is meaningless.
- */
-public class SeriesAggregateNode extends SourceNode {
-
-    // The parameter of `group by time`
-    // Its value will be null if there is no `group by time` clause,
-    private GroupByTimeParameter groupByTimeParameter;
-
-    // The aggregation function, which contains the function name and related series.
-    // (Currently we only support one series in the aggregation function)
-    // TODO: need consider whether it is suitable the aggregation function using FunctionExpression
-    private FunctionExpression aggregateFunc;
-
-    private Filter filter;
-
-    public SeriesAggregateNode(PlanNodeId id) {
-        super(id);
-    }
-
-    public SeriesAggregateNode(PlanNodeId id, FunctionExpression aggregateFunc) {
-        this(id);
-        this.aggregateFunc = aggregateFunc;
-    }
-
-    public SeriesAggregateNode(PlanNodeId id, FunctionExpression aggregateFunc, GroupByTimeParameter groupByTimeParameter) {
-        this(id, aggregateFunc);
-        this.groupByTimeParameter = groupByTimeParameter;
-    }
-
-    @Override
-    public void open() throws Exception {
-
-    }
-
-    @Override
-    public void close() throws Exception {
-
-    }
-
-    // This method is used when do the PredicatePushDown.
-    // The filter is not put in the constructor because the filter is only clear in the predicate push-down stage
-    public void setFilter(Filter filter) {
-        this.filter = filter;
-    }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/optimzation/PlanOptimizer.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/optimzation/PlanOptimizer.java
deleted file mode 100644
index 8da75d0..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/optimzation/PlanOptimizer.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package org.apache.iotdb.db.query.mpp.plan.optimzation;
-
-import org.apache.iotdb.db.query.mpp.common.QueryContext;
-import org.apache.iotdb.db.query.mpp.common.TsBlock;
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNode;
-
-public interface PlanOptimizer {
-    PlanNode<TsBlock> optimize(PlanNode<TsBlock> plan, QueryContext context);
-}

[iotdb] 01/02: add some interface

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch xingtanzjr/mpp-query-basis
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e7d8a198073e05264871d7ee2fdac9bf0829625e
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Mar 15 21:33:04 2022 +0800

    add some interface
---
 pom.xml                                            |   5 +
 server/pom.xml                                     |   4 +
 .../iotdb/db/query/mpp/exec/Coordinator.java       |  18 ++++
 .../db/query/mpp/exec/ExecFragmentInstance.java    |  59 ++++++++++++
 .../iotdb/db/query/mpp/exec/QueryExecution.java    |  18 ++++
 .../iotdb/db/query/mpp/exec/QueryScheduler.java    |  18 ++++
 .../iotdb/db/query/mpp/exec/QueryStateMachine.java |  18 ++++
 .../ExchangeDataSource.java}                       |  26 +++---
 .../iotdb/db/query/mpp/operator/Operator.java      | 102 +++++++++++++++++++++
 .../OperatorContext.java}                          |  27 +++---
 .../SeriesScanOperator.java}                       |  20 +---
 .../query/mpp/plan/node/source/CsvSourceNode.java  |   1 -
 .../mpp/plan/node/source/SeriesAggregateNode.java  |  19 ++++
 .../query/mpp/plan/node/source/SeriesScanNode.java |  39 ++++++--
 14 files changed, 322 insertions(+), 52 deletions(-)

diff --git a/pom.xml b/pom.xml
index 82b90f5..68131dd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -495,6 +495,11 @@
             </dependency>
             <dependency>
                 <groupId>io.airlift</groupId>
+                <artifactId>units</artifactId>
+                <version>1.6</version>
+            </dependency>
+            <dependency>
+                <groupId>io.airlift</groupId>
                 <artifactId>airline</artifactId>
                 <version>${airline.version}</version>
             </dependency>
diff --git a/server/pom.xml b/server/pom.xml
index 97ad1ae..6db8881 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -87,6 +87,10 @@
         </dependency>
         <dependency>
             <groupId>io.airlift</groupId>
+            <artifactId>units</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.airlift</groupId>
             <artifactId>airline</artifactId>
             <exclusions>
                 <exclusion>
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/Coordinator.java
index 3c8c2b9..b76bc8b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/Coordinator.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.query.mpp.exec;
 
 import org.apache.iotdb.db.query.mpp.common.QueryId;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/ExecFragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/ExecFragmentInstance.java
new file mode 100644
index 0000000..a4333b8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/ExecFragmentInstance.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.mpp.exec;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.io.Closeable;
+import io.airlift.units.Duration;
+
+/**
+ * ExecutableFragmentInstance encapsulates some methods which are necessary for execution scheduler to run a fragment instance
+ */
+public interface ExecFragmentInstance extends Closeable {
+
+    /**
+     * Used to judge whether this fragment instance has any more data to process
+     *
+     * @return true if the FragmentInstance is done, otherwise false.
+     */
+    boolean isFinished();
+
+    /**
+     * run the fragment instance for {@param duration} time slice, the time of this run is likely not to be equal to {@param duration},
+     * the actual run time should be calculated by the caller
+     *
+     * @param duration how long should this fragment instance run
+     * @return the returned ListenableFuture<Void> is used to represent status of this processing
+     *         if isDone() return true, meaning that this fragment instance is not blocked and is ready for next processing
+     *         otherwise, meaning that this fragment instance is blocked and not ready for next processing.
+     */
+    ListenableFuture<Void> processFor(Duration duration);
+
+    /**
+     * @return the information about this Fragment Instance in String format
+     */
+    String getInfo();
+
+    /**
+     * clear resource used by this fragment instance
+     */
+    @Override
+    void close();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryExecution.java
index 708e186..f046f12 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryExecution.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.query.mpp.exec;
 
 import org.apache.iotdb.db.query.mpp.common.Analysis;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryScheduler.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryScheduler.java
index 6c73244..e511abe 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryScheduler.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.query.mpp.exec;
 
 import org.apache.iotdb.db.query.mpp.plan.FragmentInstance;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryStateMachine.java
index c5effe2..10ba210 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryStateMachine.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.query.mpp.exec;
 
 /**
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/CsvSourceNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
similarity index 64%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/CsvSourceNode.java
copy to server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
index f6dc714..91ec40f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/CsvSourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/ExchangeDataSource.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,23 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.iotdb.db.query.mpp.operator;
 
-package org.apache.iotdb.db.query.mpp.plan.node.source;
+import com.google.common.util.concurrent.ListenableFuture;
 
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
+import java.io.Closeable;
+import java.nio.ByteBuffer;
 
-/**
- * Not implemented in current version.
- */
-public class CsvSourceNode extends SourceNode {
+public interface ExchangeDataSource extends Closeable {
+
+    ByteBuffer pollTsBlock();
 
-  public CsvSourceNode(PlanNodeId id) {
-    super(id);
-  }
+    boolean isFinished();
 
-  @Override
-  public void close() throws Exception {}
+    ListenableFuture<Void> isBlocked();
 
-  @Override
-  public void open() throws Exception {}
+    @Override
+    void close();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/Operator.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/Operator.java
new file mode 100644
index 0000000..5dd28c8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/Operator.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.mpp.operator;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.db.query.mpp.common.TsBlock;
+
+import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
+
+public interface Operator extends AutoCloseable {
+    ListenableFuture<Void> NOT_BLOCKED = immediateVoidFuture();
+
+
+    OperatorContext getOperatorContext();
+
+    /**
+     * Returns a future that will be completed when the operator becomes
+     * unblocked.  If the operator is not blocked, this method should return
+     * {@code NOT_BLOCKED}.
+     */
+    default ListenableFuture<Void> isBlocked()
+    {
+        return NOT_BLOCKED;
+    }
+
+    /**
+     * Returns true if and only if this operator can accept an input page.
+     */
+    boolean needsInput();
+
+    /**
+     * Adds an input page to the operator.  This method will only be called if
+     * {@code needsInput()} returns true.
+     */
+    void addInput(TsBlock page);
+
+    /**
+     * Gets an output page from the operator.  If no output data is currently
+     * available, return null.
+     */
+    TsBlock getOutput();
+
+    /**
+     * After calling this method operator should revoke all reserved revocable memory.
+     * As soon as memory is revoked returned future should be marked as done.
+     * <p>
+     * Spawned threads cannot modify OperatorContext because it's not thread safe.
+     * For this purpose implement {@link #finishMemoryRevoke()}
+     * <p>
+     * Since memory revoking signal is delivered asynchronously to the Operator, implementation
+     * must gracefully handle the case when there no longer is any revocable memory allocated.
+     * <p>
+     * After this method is called on Operator the Driver is disallowed to call any
+     * processing methods on it (isBlocked/needsInput/addInput/getOutput) until
+     * {@link #finishMemoryRevoke()} is called.
+     */
+    default ListenableFuture<Void> startMemoryRevoke()
+    {
+        return NOT_BLOCKED;
+    }
+
+    /**
+     * Clean up and release resources after completed memory revoking. Called by driver
+     * once future returned by startMemoryRevoke is completed.
+     */
+    default void finishMemoryRevoke() {}
+
+    /**
+     * Notifies the operator that no more pages will be added and the
+     * operator should finish processing and flush results. This method
+     * will not be called if the Task is already failed or canceled.
+     */
+    void finish();
+
+    /**
+     * Is this operator completely finished processing and no more
+     * output pages will be produced.
+     */
+    boolean isFinished();
+
+    /**
+     * This method will always be called before releasing the Operator reference.
+     */
+    @Override
+    default void close() throws Exception {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/CsvSourceNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/OperatorContext.java
similarity index 60%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/CsvSourceNode.java
copy to server/src/main/java/org/apache/iotdb/db/query/mpp/operator/OperatorContext.java
index f6dc714..84c2964 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/CsvSourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/OperatorContext.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,23 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
-package org.apache.iotdb.db.query.mpp.plan.node.source;
+package org.apache.iotdb.db.query.mpp.operator;
 
 import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
 
 /**
- * Not implemented in current version.
+ * Contains information about {@link Operator} execution.
+ * <p>
+ * Not thread-safe.
  */
-public class CsvSourceNode extends SourceNode {
-
-  public CsvSourceNode(PlanNodeId id) {
-    super(id);
-  }
+public class OperatorContext {
 
-  @Override
-  public void close() throws Exception {}
+    private final int operatorId;
+    private final PlanNodeId planNodeId;
+    private final String operatorType;
 
-  @Override
-  public void open() throws Exception {}
+    public OperatorContext(int operatorId, PlanNodeId planNodeId, String operatorType) {
+        this.operatorId = operatorId;
+        this.planNodeId = planNodeId;
+        this.operatorType = operatorType;
+    }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/CsvSourceNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java
similarity index 68%
copy from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/CsvSourceNode.java
copy to server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java
index f6dc714..eff9fa7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/CsvSourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/operator/SeriesScanOperator.java
@@ -16,23 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.iotdb.db.query.mpp.operator;
 
-package org.apache.iotdb.db.query.mpp.plan.node.source;
-
-import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
-
-/**
- * Not implemented in current version.
- */
-public class CsvSourceNode extends SourceNode {
-
-  public CsvSourceNode(PlanNodeId id) {
-    super(id);
-  }
-
-  @Override
-  public void close() throws Exception {}
-
-  @Override
-  public void open() throws Exception {}
+public class SeriesScanOperator {
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/CsvSourceNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/CsvSourceNode.java
index f6dc714..5d76beb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/CsvSourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/CsvSourceNode.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.iotdb.db.query.mpp.plan.node.source;
 
 import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SeriesAggregateNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SeriesAggregateNode.java
index 3b19af1..ec849f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SeriesAggregateNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SeriesAggregateNode.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.iotdb.db.query.mpp.plan.node.source;
 
 import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SeriesScanNode.java
index 8ecb6de..4b6a187 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SeriesScanNode.java
@@ -1,5 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.iotdb.db.query.mpp.plan.node.source;
 
+import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.query.mpp.common.OrderBy;
 import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -15,15 +35,18 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 public class SeriesScanNode extends SourceNode {
 
   // The path of the target series which will be scanned.
-  private Path seriesPath;
+  private PartialPath seriesPath;
 
   // The order to traverse the data.
   // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
   // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
   private OrderBy scanOrder = OrderBy.TIMESTAMP_ASC;
 
-  // Filter data in current series.
-  private Filter filter;
+  // time filter for current series, could be null if doesn't exist
+  private Filter timeFilter;
+
+  // value filter for current series, could be null if doesn't exist
+  private Filter valueFilter;
 
   // Limit for result set. The default value is -1, which means no limit
   private int limit;
@@ -31,13 +54,17 @@ public class SeriesScanNode extends SourceNode {
   // offset for result set. The default value is 0
   private int offset;
 
-  public SeriesScanNode(PlanNodeId id, Path seriesPath) {
+  public SeriesScanNode(PlanNodeId id, PartialPath seriesPath) {
     super(id);
     this.seriesPath = seriesPath;
   }
 
-  public void setFilter(Filter filter) {
-    this.filter = filter;
+  public void setTimeFilter(Filter timeFilter) {
+    this.timeFilter = timeFilter;
+  }
+
+  public void setValueFilter(Filter valueFilter) {
+    this.valueFilter = valueFilter;
   }
 
   @Override