You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/03/14 09:07:14 UTC

[iotdb] branch xingtanzjr/mpp-query-basis updated: tmp saved for queryExecution frame

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/mpp-query-basis
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/xingtanzjr/mpp-query-basis by this push:
     new dc2bbbd  tmp saved for queryExecution frame
dc2bbbd is described below

commit dc2bbbdb8359cee68b3ab6be26a0c675f13187e7
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Mar 14 17:06:24 2022 +0800

    tmp saved for queryExecution frame
---
 .../apache/iotdb/db/query/mpp/common/Analysis.java |  8 ++++
 .../iotdb/db/query/mpp/common/QueryContext.java    | 10 +++++
 .../apache/iotdb/db/query/mpp/common/QueryId.java  | 13 ++++++
 .../iotdb/db/query/mpp/common/QuerySession.java    |  4 ++
 .../iotdb/db/query/mpp/exec/Coordinator.java       |  5 +++
 .../iotdb/db/query/mpp/exec/QueryExecution.java    | 51 ++++++++++++++++++++++
 .../iotdb/db/query/mpp/exec/QueryScheduler.java    |  8 ++++
 .../iotdb/db/query/mpp/exec/QueryStateMachine.java |  8 ++++
 .../db/query/mpp/plan/DistributionPlanner.java     |  4 ++
 .../iotdb/db/query/mpp/plan/LogicalPlanner.java    | 23 ++++++++++
 .../iotdb/db/query/mpp/plan/LogicalQueryPlan.java  | 13 ++++++
 .../db/query/mpp/plan/{ => node}/PlanNode.java     |  3 +-
 .../db/query/mpp/plan/{ => node}/PlanNodeId.java   |  2 +-
 .../query/mpp/plan/node/PlanNodeIdAllocator.java   |  7 +++
 .../plan/{ => node}/process/DeviceMergeNode.java   |  6 +--
 .../mpp/plan/{ => node}/process/FillNode.java      |  4 +-
 .../mpp/plan/{ => node}/process/FilterNode.java    |  4 +-
 .../plan/{ => node}/process/GroupByLevelNode.java  |  4 +-
 .../mpp/plan/{ => node}/process/LimitNode.java     |  4 +-
 .../mpp/plan/{ => node}/process/OffsetNode.java    |  4 +-
 .../mpp/plan/{ => node}/process/ProcessNode.java   |  6 +--
 .../process/RowBasedSeriesAggregateNode.java       |  4 +-
 .../mpp/plan/{ => node}/process/SortNode.java      |  4 +-
 .../mpp/plan/{ => node}/process/TimeJoinNode.java  |  6 +--
 .../mpp/plan/{ => node}/process/WithoutNode.java   |  4 +-
 .../mpp/plan/{ => node}/sink/CsvSinkNode.java      |  4 +-
 .../mpp/plan/{ => node}/sink/FragmentSinkNode.java |  4 +-
 .../query/mpp/plan/{ => node}/sink/SinkNode.java   |  6 +--
 .../mpp/plan/{ => node}/sink/ThriftSinkNode.java   |  4 +-
 .../mpp/plan/{ => node}/source/CsvSourceNode.java  |  4 +-
 .../{ => node}/source/SeriesAggregateNode.java     |  4 +-
 .../mpp/plan/{ => node}/source/SeriesScanNode.java |  4 +-
 .../mpp/plan/{ => node}/source/SourceNode.java     |  6 +--
 .../query/mpp/plan/optimzation/PlanOptimizer.java  |  9 ++++
 34 files changed, 209 insertions(+), 45 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/common/Analysis.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/common/Analysis.java
new file mode 100644
index 0000000..e003b0f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/common/Analysis.java
@@ -0,0 +1,8 @@
+package org.apache.iotdb.db.query.mpp.common;
+
+/**
+ * Analysis used for planning a query.
+ * TODO: This class may need to store more info for a query.
+ */
+public class Analysis {
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/common/QueryContext.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/common/QueryContext.java
new file mode 100644
index 0000000..7f8ba05
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/common/QueryContext.java
@@ -0,0 +1,10 @@
+package org.apache.iotdb.db.query.mpp.common;
+
+/**
+ * This class is used to record the context of a query including QueryId, query statement, session info and so on
+ */
+public class QueryContext {
+    private String statement;
+    private QueryId queryId;
+    private QuerySession session;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/common/QueryId.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/common/QueryId.java
new file mode 100644
index 0000000..6fdc292
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/common/QueryId.java
@@ -0,0 +1,13 @@
+package org.apache.iotdb.db.query.mpp.common;
+
+public class QueryId {
+    private String Id;
+
+    public String getId() {
+        return Id;
+    }
+
+    public void setId(String id) {
+        Id = id;
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/common/QuerySession.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/common/QuerySession.java
new file mode 100644
index 0000000..0fb8232
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/common/QuerySession.java
@@ -0,0 +1,4 @@
+package org.apache.iotdb.db.query.mpp.common;
+
+public class QuerySession {
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/Coordinator.java
new file mode 100644
index 0000000..90eec35
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/Coordinator.java
@@ -0,0 +1,5 @@
+package org.apache.iotdb.db.query.mpp.exec;
+
+public class Coordinator {
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryExecution.java
new file mode 100644
index 0000000..71b49b2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryExecution.java
@@ -0,0 +1,51 @@
+package org.apache.iotdb.db.query.mpp.exec;
+
+import org.apache.iotdb.db.query.mpp.common.Analysis;
+import org.apache.iotdb.db.query.mpp.common.QueryContext;
+import org.apache.iotdb.db.query.mpp.common.QueryId;
+import org.apache.iotdb.db.query.mpp.plan.LogicalPlanner;
+import org.apache.iotdb.db.query.mpp.plan.optimzation.PlanOptimizer;
+
+import java.util.List;
+
+/**
+ * QueryExecution stores all the status of a query which is being prepared or running inside the MPP frame.
+ * It takes three main responsibilities:
+ *      1. Prepare a query. Transform a query from statement to DistributedQueryPlan with fragment instances.
+ *      2. Dispatch all the fragment instances to corresponding physical nodes.
+ *      3. Collect and monitor the progress/states of this query.
+ */
+public class QueryExecution {
+    private QueryContext context;
+    private QueryScheduler scheduler;
+    private QueryStateMachine stateMachine;
+
+    private List<PlanOptimizer> planOptimizers;
+
+    private Analysis analysis;
+
+    public QueryExecution(QueryContext context) {
+        this.context = context;
+    }
+
+    // Analyze the statement in QueryContext. Generate the analysis this query need
+    public void analyze() {
+        // initialize the variable `analysis`
+    }
+
+    // Use LogicalPlanner to do the logical query plan and logical optimization
+    public void doLogicalPlan() {
+
+    }
+
+    // Generate the distributed plan and split it into fragments
+    public void doDistributedPlan() {
+
+    }
+
+    // Convert fragment to detailed instance
+    // And for parallel-able fragment, clone it into several instances with different params.
+    public void planFragmentInstances() {
+
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryScheduler.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryScheduler.java
new file mode 100644
index 0000000..e70c164
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryScheduler.java
@@ -0,0 +1,8 @@
+package org.apache.iotdb.db.query.mpp.exec;
+
+/**
+ * QueryScheduler is used to dispatch the fragment instances of a query to target nodes. And it will continue to
+ * collect and monitor the query execution before the query is finished.
+ */
+public class QueryScheduler {
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryStateMachine.java
new file mode 100644
index 0000000..c5effe2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/exec/QueryStateMachine.java
@@ -0,0 +1,8 @@
+package org.apache.iotdb.db.query.mpp.exec;
+
+/**
+ * State machine for a QueryExecution. It stores the states for the QueryExecution.
+ * Others can register listeners when the state changes of the QueryExecution.
+ */
+public class QueryStateMachine {
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/DistributionPlanner.java
new file mode 100644
index 0000000..f269c35
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/DistributionPlanner.java
@@ -0,0 +1,4 @@
+package org.apache.iotdb.db.query.mpp.plan;
+
+public class DistributionPlanner {
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/LogicalPlanner.java
new file mode 100644
index 0000000..8d17ca2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/LogicalPlanner.java
@@ -0,0 +1,23 @@
+package org.apache.iotdb.db.query.mpp.plan;
+
+import org.apache.iotdb.db.query.mpp.common.Analysis;
+import org.apache.iotdb.db.query.mpp.common.QueryContext;
+import org.apache.iotdb.db.query.mpp.plan.optimzation.PlanOptimizer;
+
+import java.util.List;
+
+public class LogicalPlanner {
+    private Analysis analysis;
+    private QueryContext context;
+    private List<PlanOptimizer> optimizers;
+
+    public LogicalPlanner(Analysis analysis, QueryContext context, List<PlanOptimizer> optimizers) {
+        this.analysis = analysis;
+        this.context = context;
+        this.optimizers = optimizers;
+    }
+
+    public LogicalQueryPlan plan() {
+        return null;
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/LogicalQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/LogicalQueryPlan.java
new file mode 100644
index 0000000..7820fbd
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/LogicalQueryPlan.java
@@ -0,0 +1,13 @@
+package org.apache.iotdb.db.query.mpp.plan;
+
+import org.apache.iotdb.db.query.mpp.common.QueryContext;
+import org.apache.iotdb.db.query.mpp.common.TsBlock;
+import org.apache.iotdb.db.query.mpp.plan.node.PlanNode;
+
+/**
+ * LogicalQueryPlan represents a logical query plan. It stores the root node of corresponding query plan node tree.
+ */
+public class LogicalQueryPlan {
+    private QueryContext context;
+    private PlanNode<TsBlock> rootNode;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/PlanNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/PlanNode.java
similarity index 80%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/PlanNode.java
rename to server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/PlanNode.java
index 07df26c..5b4a62f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/PlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/PlanNode.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.db.query.mpp.plan;
+package org.apache.iotdb.db.query.mpp.plan.node;
 
 
 import org.apache.iotdb.db.query.mpp.common.TreeNode;
@@ -7,6 +7,7 @@ import org.apache.iotdb.db.query.mpp.common.TreeNode;
  * @author xingtanzjr
  * The base class of query executable operators, which is used to compose logical query plan.
  * TODO: consider how to restrict the children type for each type of ExecOperator
+ * TODO: consider to fix the Template type as TsBlock
  */
 public abstract class PlanNode<T> extends TreeNode<PlanNode<T>> {
     private PlanNodeId id;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/PlanNodeId.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/PlanNodeId.java
similarity index 95%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/PlanNodeId.java
rename to server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/PlanNodeId.java
index b67344d..e694df7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/PlanNodeId.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/PlanNodeId.java
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.iotdb.db.query.mpp.plan;
+package org.apache.iotdb.db.query.mpp.plan.node;
 
 public class PlanNodeId {
     private String id;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/PlanNodeIdAllocator.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/PlanNodeIdAllocator.java
new file mode 100644
index 0000000..738e202
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/PlanNodeIdAllocator.java
@@ -0,0 +1,7 @@
+package org.apache.iotdb.db.query.mpp.plan.node;
+
+/**
+ * A centralized PlanNodeId generator
+ */
+public class PlanNodeIdAllocator {
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/DeviceMergeNode.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/DeviceMergeNode.java
rename to server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/DeviceMergeNode.java
index c24d5e8..91dac51 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/DeviceMergeNode.java
@@ -1,10 +1,10 @@
-package org.apache.iotdb.db.query.mpp.plan.process;
+package org.apache.iotdb.db.query.mpp.plan.node.process;
 
 import org.apache.iotdb.db.query.mpp.common.OrderBy;
 import org.apache.iotdb.db.query.mpp.common.TsBlock;
 import org.apache.iotdb.db.query.mpp.common.WithoutPolicy;
-import org.apache.iotdb.db.query.mpp.plan.PlanNode;
-import org.apache.iotdb.db.query.mpp.plan.PlanNodeId;
+import org.apache.iotdb.db.query.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
 
 import java.util.Map;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/FillNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/FillNode.java
similarity index 79%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/FillNode.java
rename to server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/FillNode.java
index 9ce2ae0..08cfd61 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/FillNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/FillNode.java
@@ -1,7 +1,7 @@
-package org.apache.iotdb.db.query.mpp.plan.process;
+package org.apache.iotdb.db.query.mpp.plan.node.process;
 
 import org.apache.iotdb.db.query.mpp.common.FillPolicy;
-import org.apache.iotdb.db.query.mpp.plan.PlanNodeId;
+import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
 
 /**
  * FillNode is used to fill the empty field in one row.
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/FilterNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/FilterNode.java
similarity index 78%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/FilterNode.java
rename to server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/FilterNode.java
index 12e10e3..6a5641d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/FilterNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/FilterNode.java
@@ -1,7 +1,7 @@
-package org.apache.iotdb.db.query.mpp.plan.process;
+package org.apache.iotdb.db.query.mpp.plan.node.process;
 
 import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
-import org.apache.iotdb.db.query.mpp.plan.PlanNodeId;
+import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
 
 /**
  * The FilterNode is responsible to filter the RowRecord from TsBlock.
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/GroupByLevelNode.java
similarity index 89%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/GroupByLevelNode.java
rename to server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/GroupByLevelNode.java
index c5341a1..b9d83ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/GroupByLevelNode.java
@@ -1,6 +1,6 @@
-package org.apache.iotdb.db.query.mpp.plan.process;
+package org.apache.iotdb.db.query.mpp.plan.node.process;
 
-import org.apache.iotdb.db.query.mpp.plan.PlanNodeId;
+import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
 
 /**
  * This node is responsible for the final aggregation merge operation.
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/LimitNode.java
similarity index 75%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/LimitNode.java
rename to server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/LimitNode.java
index ff8087b..35c1d2b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/LimitNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/LimitNode.java
@@ -1,6 +1,6 @@
-package org.apache.iotdb.db.query.mpp.plan.process;
+package org.apache.iotdb.db.query.mpp.plan.node.process;
 
-import org.apache.iotdb.db.query.mpp.plan.PlanNodeId;
+import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
 
 /**
  * LimitNode is used to select top n result. It uses the default order of upstream nodes
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/OffsetNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/OffsetNode.java
similarity index 76%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/OffsetNode.java
rename to server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/OffsetNode.java
index 134de78..e70a766 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/OffsetNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/OffsetNode.java
@@ -1,6 +1,6 @@
-package org.apache.iotdb.db.query.mpp.plan.process;
+package org.apache.iotdb.db.query.mpp.plan.node.process;
 
-import org.apache.iotdb.db.query.mpp.plan.PlanNodeId;
+import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
 
 /**
  * OffsetNode is used to skip top n result from upstream nodes. It uses the default order of upstream nodes
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/ProcessNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/ProcessNode.java
similarity index 50%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/ProcessNode.java
rename to server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/ProcessNode.java
index f7bb6ec..17ce248 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/ProcessNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/ProcessNode.java
@@ -1,8 +1,8 @@
-package org.apache.iotdb.db.query.mpp.plan.process;
+package org.apache.iotdb.db.query.mpp.plan.node.process;
 
 import org.apache.iotdb.db.query.mpp.common.TsBlock;
-import org.apache.iotdb.db.query.mpp.plan.PlanNode;
-import org.apache.iotdb.db.query.mpp.plan.PlanNodeId;
+import org.apache.iotdb.db.query.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
 
 public class ProcessNode extends PlanNode<TsBlock> {
     public ProcessNode(PlanNodeId id) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/RowBasedSeriesAggregateNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/RowBasedSeriesAggregateNode.java
similarity index 92%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/RowBasedSeriesAggregateNode.java
rename to server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/RowBasedSeriesAggregateNode.java
index 83b9b7f..18b012e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/RowBasedSeriesAggregateNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/RowBasedSeriesAggregateNode.java
@@ -1,8 +1,8 @@
-package org.apache.iotdb.db.query.mpp.plan.process;
+package org.apache.iotdb.db.query.mpp.plan.node.process;
 
 import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
 import org.apache.iotdb.db.query.mpp.common.GroupByTimeParameter;
-import org.apache.iotdb.db.query.mpp.plan.PlanNodeId;
+import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
 
 import java.util.List;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/SortNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/SortNode.java
similarity index 80%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/SortNode.java
rename to server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/SortNode.java
index f144712..430df2f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/SortNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/SortNode.java
@@ -1,7 +1,7 @@
-package org.apache.iotdb.db.query.mpp.plan.process;
+package org.apache.iotdb.db.query.mpp.plan.node.process;
 
 import org.apache.iotdb.db.query.mpp.common.OrderBy;
-import org.apache.iotdb.db.query.mpp.plan.PlanNodeId;
+import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
 
 /**
  * In general, the parameter in sortNode should be pushed down to the upstream operators.
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/TimeJoinNode.java
similarity index 89%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/TimeJoinNode.java
rename to server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/TimeJoinNode.java
index cff9f16..c37704a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/TimeJoinNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/TimeJoinNode.java
@@ -1,10 +1,10 @@
-package org.apache.iotdb.db.query.mpp.plan.process;
+package org.apache.iotdb.db.query.mpp.plan.node.process;
 
 import org.apache.iotdb.db.query.mpp.common.OrderBy;
 import org.apache.iotdb.db.query.mpp.common.TsBlock;
 import org.apache.iotdb.db.query.mpp.common.WithoutPolicy;
-import org.apache.iotdb.db.query.mpp.plan.PlanNode;
-import org.apache.iotdb.db.query.mpp.plan.PlanNodeId;
+import org.apache.iotdb.db.query.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
 
 import java.util.Arrays;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/WithoutNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/WithoutNode.java
similarity index 81%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/WithoutNode.java
rename to server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/WithoutNode.java
index 7ae1a70..04b0380 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/process/WithoutNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/process/WithoutNode.java
@@ -1,7 +1,7 @@
-package org.apache.iotdb.db.query.mpp.plan.process;
+package org.apache.iotdb.db.query.mpp.plan.node.process;
 
 import org.apache.iotdb.db.query.mpp.common.WithoutPolicy;
-import org.apache.iotdb.db.query.mpp.plan.PlanNodeId;
+import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
 
 /**
  * WithoutNode is used to discard specific rows from upstream node.
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/sink/CsvSinkNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/CsvSinkNode.java
similarity index 89%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/sink/CsvSinkNode.java
rename to server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/CsvSinkNode.java
index d46d9a0..809ff37 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/sink/CsvSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/CsvSinkNode.java
@@ -17,9 +17,9 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.query.mpp.plan.sink;
+package org.apache.iotdb.db.query.mpp.plan.node.sink;
 
-import org.apache.iotdb.db.query.mpp.plan.PlanNodeId;
+import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
 
 public class CsvSinkNode extends SinkNode {
   public CsvSinkNode(PlanNodeId id) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/FragmentSinkNode.java
similarity index 67%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/sink/FragmentSinkNode.java
rename to server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/FragmentSinkNode.java
index d35281d..ce0bd3c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/FragmentSinkNode.java
@@ -1,6 +1,6 @@
-package org.apache.iotdb.db.query.mpp.plan.sink;
+package org.apache.iotdb.db.query.mpp.plan.node.sink;
 
-import org.apache.iotdb.db.query.mpp.plan.PlanNodeId;
+import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
 
 public class FragmentSinkNode extends SinkNode {
     public FragmentSinkNode(PlanNodeId id) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/sink/SinkNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/SinkNode.java
similarity index 86%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/sink/SinkNode.java
rename to server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/SinkNode.java
index 6d757c7..31fa7ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/sink/SinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/SinkNode.java
@@ -17,11 +17,11 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.query.mpp.plan.sink;
+package org.apache.iotdb.db.query.mpp.plan.node.sink;
 
 import org.apache.iotdb.db.query.mpp.common.TsBlock;
-import org.apache.iotdb.db.query.mpp.plan.PlanNode;
-import org.apache.iotdb.db.query.mpp.plan.PlanNodeId;
+import org.apache.iotdb.db.query.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
 
 public abstract class SinkNode extends PlanNode<TsBlock> implements AutoCloseable {
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/sink/ThriftSinkNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/ThriftSinkNode.java
similarity index 90%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/sink/ThriftSinkNode.java
rename to server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/ThriftSinkNode.java
index 6dd1afd..c71ce9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/sink/ThriftSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/sink/ThriftSinkNode.java
@@ -17,9 +17,9 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.query.mpp.plan.sink;
+package org.apache.iotdb.db.query.mpp.plan.node.sink;
 
-import org.apache.iotdb.db.query.mpp.plan.PlanNodeId;
+import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
 
 /**
  * not implemented in current IoTDB yet
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/source/CsvSourceNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/CsvSourceNode.java
similarity index 90%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/source/CsvSourceNode.java
rename to server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/CsvSourceNode.java
index c2d77f2..f6dc714 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/source/CsvSourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/CsvSourceNode.java
@@ -17,9 +17,9 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.query.mpp.plan.source;
+package org.apache.iotdb.db.query.mpp.plan.node.source;
 
-import org.apache.iotdb.db.query.mpp.plan.PlanNodeId;
+import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
 
 /**
  * Not implemented in current version.
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/source/SeriesAggregateNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SeriesAggregateNode.java
similarity index 95%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/source/SeriesAggregateNode.java
rename to server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SeriesAggregateNode.java
index a773e85..3b19af1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/source/SeriesAggregateNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SeriesAggregateNode.java
@@ -1,8 +1,8 @@
-package org.apache.iotdb.db.query.mpp.plan.source;
+package org.apache.iotdb.db.query.mpp.plan.node.source;
 
 import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
 import org.apache.iotdb.db.query.mpp.common.GroupByTimeParameter;
-import org.apache.iotdb.db.query.mpp.plan.PlanNodeId;
+import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 /**
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SeriesScanNode.java
similarity index 93%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/source/SeriesScanNode.java
rename to server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SeriesScanNode.java
index 7bc4972..8ecb6de 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SeriesScanNode.java
@@ -1,7 +1,7 @@
-package org.apache.iotdb.db.query.mpp.plan.source;
+package org.apache.iotdb.db.query.mpp.plan.node.source;
 
 import org.apache.iotdb.db.query.mpp.common.OrderBy;
-import org.apache.iotdb.db.query.mpp.plan.PlanNodeId;
+import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/source/SourceNode.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SourceNode.java
similarity index 86%
rename from server/src/main/java/org/apache/iotdb/db/query/mpp/plan/source/SourceNode.java
rename to server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SourceNode.java
index d207d26..9ee8ff7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/source/SourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/node/source/SourceNode.java
@@ -17,11 +17,11 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.query.mpp.plan.source;
+package org.apache.iotdb.db.query.mpp.plan.node.source;
 
 import org.apache.iotdb.db.query.mpp.common.TsBlock;
-import org.apache.iotdb.db.query.mpp.plan.PlanNode;
-import org.apache.iotdb.db.query.mpp.plan.PlanNodeId;
+import org.apache.iotdb.db.query.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.query.mpp.plan.node.PlanNodeId;
 
 public abstract class SourceNode extends PlanNode<TsBlock> implements AutoCloseable{
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/optimzation/PlanOptimizer.java b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/optimzation/PlanOptimizer.java
new file mode 100644
index 0000000..8da75d0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/mpp/plan/optimzation/PlanOptimizer.java
@@ -0,0 +1,9 @@
+package org.apache.iotdb.db.query.mpp.plan.optimzation;
+
+import org.apache.iotdb.db.query.mpp.common.QueryContext;
+import org.apache.iotdb.db.query.mpp.common.TsBlock;
+import org.apache.iotdb.db.query.mpp.plan.node.PlanNode;
+
+public interface PlanOptimizer {
+    PlanNode<TsBlock> optimize(PlanNode<TsBlock> plan, QueryContext context);
+}