You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/03/18 10:38:14 UTC

[iotdb] branch mpp-ty updated: partial operator

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch mpp-ty
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/mpp-ty by this push:
     new b877352  partial operator
b877352 is described below

commit b8773527c817b2442142dc5bfb51138523c5cbff
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri Mar 18 18:37:23 2022 +0800

    partial operator
---
 .../{WithoutPolicy.java => FilterNullPolicy.java}  |   2 +-
 .../common/{WithoutPolicy.java => InstanceId.java} |  10 +-
 .../iotdb/db/mpp/execution/InstanceContext.java    |  44 +++++++
 .../iotdb/db/mpp/execution/InstanceState.java      |  77 +++++++++++++
 .../iotdb/db/mpp/execution/QueryExecution.java     |   4 +-
 .../iotdb/db/mpp/execution/QueryScheduler.java     |   2 +-
 .../executor/InstanceExecutor.java}                |  28 ++---
 .../executor/InstanceHandle.java}                  |  19 ++--
 .../iotdb/db/mpp/operator/OperatorContext.java     |   2 +-
 .../process/AggregateOperator.java}                |  37 ++++--
 .../process/DeviceMergeOperator.java}              |  36 ++++--
 .../process/FillOperator.java}                     |  36 ++++--
 .../process/FilterNullOperator.java}               |  37 ++++--
 .../process/GroupByLevelOperator.java}             |  37 ++++--
 .../process/LimitOperator.java}                    |  36 ++++--
 .../process/OffsetOperator.java}                   |  37 ++++--
 .../process/ProcessOperator.java}                  |  11 +-
 .../process/SortOperator.java}                     |  37 ++++--
 .../process/TimeJoinOperator.java}                 |  37 ++++--
 .../sink/SinkOperator.java}                        |  34 +++---
 .../source/SeriesAggregateScanOperator.java}       |  42 +++++--
 .../source/SeriesScanOperator.java}                |  43 +++++--
 .../source/SourceOperator.java}                    |  12 +-
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  | 126 +++++++++++++++++++++
 .../planner/optimization}/PlanOptimizer.java       |   4 +-
 .../planner}/plan/DistributedQueryPlan.java        |   6 +-
 .../planner}/plan/DistributionPlanner.java         |   2 +-
 .../{ => sql/planner}/plan/FragmentInstance.java   |   2 +-
 .../{ => sql/planner}/plan/FragmentInstanceId.java |   2 +-
 .../mpp/{ => sql/planner}/plan/LogicalPlanner.java |   4 +-
 .../{ => sql/planner}/plan/LogicalQueryPlan.java   |   6 +-
 .../mpp/{ => sql/planner}/plan/PlanFragment.java   |   6 +-
 .../mpp/{ => sql/planner}/plan/PlanFragmentId.java |   2 +-
 .../mpp/{ => sql/planner}/plan/node/PlanNode.java  |  30 +++--
 .../{ => sql/planner}/plan/node/PlanNodeId.java    |   2 +-
 .../planner}/plan/node/PlanNodeIdAllocator.java    |   2 +-
 .../db/mpp/sql/planner/plan/node/PlanVisitor.java  |  76 +++++++++++++
 .../planner/plan/node/process/AggregateNode.java}  |  47 +++++---
 .../plan/node/process/DeviceMergeNode.java         |  38 +++++--
 .../planner}/plan/node/process/FillNode.java       |  26 ++++-
 .../sql/planner/plan/node/process/FilterNode.java  |  64 +++++++++++
 .../planner/plan/node/process/FilterNullNode.java  |  70 ++++++++++++
 .../plan/node/process/GroupByLevelNode.java        |  47 +++++++-
 .../planner}/plan/node/process/LimitNode.java      |  40 +++++--
 .../planner}/plan/node/process/OffsetNode.java     |  39 +++++--
 .../planner}/plan/node/process/ProcessNode.java    |   9 +-
 .../planner}/plan/node/process/SortNode.java       |  34 +++++-
 .../planner}/plan/node/process/TimeJoinNode.java   |  50 +++++---
 .../planner}/plan/node/sink/CsvSinkNode.java       |  17 ++-
 .../planner}/plan/node/sink/FragmentSinkNode.java  |  17 ++-
 .../{ => sql/planner}/plan/node/sink/SinkNode.java |   8 +-
 .../planner}/plan/node/sink/ThriftSinkNode.java    |  17 ++-
 .../planner}/plan/node/source/CsvSourceNode.java   |  17 ++-
 .../plan/node/source/SeriesAggregateScanNode.java} |  34 +++++-
 .../planner}/plan/node/source/SeriesScanNode.java  |  26 ++++-
 .../planner}/plan/node/source/SourceNode.java      |   8 +-
 .../tree/Expression.java}                          |   7 +-
 57 files changed, 1253 insertions(+), 292 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java
similarity index 96%
copy from server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java
index 7a4107e..7979b27 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.common;
 
-public enum WithoutPolicy {
+public enum FilterNullPolicy {
   CONTAINS_NULL,
   ALL_NULL
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/InstanceId.java
similarity index 86%
copy from server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/common/InstanceId.java
index 7a4107e..a3dd91f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/InstanceId.java
@@ -18,7 +18,11 @@
  */
 package org.apache.iotdb.db.mpp.common;
 
-public enum WithoutPolicy {
-  CONTAINS_NULL,
-  ALL_NULL
+public class InstanceId {
+
+    private final String fullId;
+
+    public InstanceId(String fullId) {
+        this.fullId = fullId;
+    }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/InstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/InstanceContext.java
new file mode 100644
index 0000000..b6d27f5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/InstanceContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import org.apache.iotdb.db.mpp.common.InstanceId;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class InstanceContext {
+
+    private InstanceId id;
+
+    private final long createNanos = System.nanoTime();
+
+//    private final GcMonitor gcMonitor;
+//    private final AtomicLong startNanos = new AtomicLong();
+//    private final AtomicLong startFullGcCount = new AtomicLong(-1);
+//    private final AtomicLong startFullGcTimeNanos = new AtomicLong(-1);
+//    private final AtomicLong endNanos = new AtomicLong();
+//    private final AtomicLong endFullGcCount = new AtomicLong(-1);
+//    private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
+
+
+    public InstanceContext(InstanceId id) {
+        this.id = id;
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/InstanceState.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/InstanceState.java
new file mode 100644
index 0000000..847a6e7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/InstanceState.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
+
+public enum InstanceState {
+    /**
+     * Instance is planned but has not been scheduled yet. An instance will
+     * be in the planned state until, the dependencies of the instance
+     * have begun producing output.
+     */
+    PLANNED(false),
+    /**
+     * Instance is running.
+     */
+    RUNNING(false),
+    /**
+     * Instance has finished executing and output is left to be consumed.
+     * In this state, there will be no new drivers, the existing drivers have finished
+     * and the output buffer of the instance is at-least in a 'no-more-tsBlocks' state.
+     */
+    FLUSHING(false),
+    /**
+     * Instance has finished executing and all output has been consumed.
+     */
+    FINISHED(true),
+    /**
+     * Instance was canceled by a user.
+     */
+    CANCELED(true),
+    /**
+     * Instance was aborted due to a failure in the query. The failure
+     * was not in this instance.
+     */
+    ABORTED(true),
+    /**
+     * Instance execution failed.
+     */
+    FAILED(true);
+
+    public static final Set<InstanceState> TERMINAL_TASK_STATES = Stream.of(InstanceState.values()).filter(InstanceState::isDone).collect(toImmutableSet());
+
+    private final boolean doneState;
+
+    InstanceState(boolean doneState)
+    {
+        this.doneState = doneState;
+    }
+
+    /**
+     * Is this a terminal state.
+     */
+    public boolean isDone()
+    {
+        return doneState;
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 2b9d8cf..9852350 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -20,8 +20,8 @@ package org.apache.iotdb.db.mpp.execution;
 
 import org.apache.iotdb.db.mpp.common.Analysis;
 import org.apache.iotdb.db.mpp.common.QueryContext;
-import org.apache.iotdb.db.mpp.plan.*;
-import org.apache.iotdb.db.mpp.plan.optimzation.PlanOptimizer;
+import org.apache.iotdb.db.mpp.sql.planner.optimization.PlanOptimizer;
+import org.apache.iotdb.db.mpp.sql.planner.plan.*;
 
 import java.nio.ByteBuffer;
 import java.util.List;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryScheduler.java
index fc5df30..d215c7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryScheduler.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
-import org.apache.iotdb.db.mpp.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 
 import java.util.List;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/InstanceExecutor.java
similarity index 55%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/InstanceExecutor.java
index a4cb88c..8e819ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/InstanceExecutor.java
@@ -16,23 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.execution.executor;
 
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
-import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.iotdb.db.mpp.execution.ExecFragmentInstance;
 
-/** The FilterNode is responsible to filter the RowRecord from TsBlock. */
-public class FilterNode extends ProcessNode {
 
-  // The filter
-  private FilterOperator rowFilter;
+public class InstanceExecutor {
 
-  public FilterNode(PlanNodeId id) {
-    super(id);
-  }
+    /**
+     * TODO native implementation, should be replaced later
+     * @param instance executable fragment instance
+     * @param handle instance handle
+     * @return ListenableFuture indicate the instance's end state
+     */
+    public ListenableFuture<Void> enqueueInstance(ExecFragmentInstance instance, InstanceHandle handle) {
+        return SettableFuture.create();
+    }
 
-  public FilterNode(PlanNodeId id, FilterOperator rowFilter) {
-    this(id);
-    this.rowFilter = rowFilter;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/InstanceHandle.java
similarity index 72%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/FragmentSinkNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/InstanceHandle.java
index 58c3a71..08444e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/InstanceHandle.java
@@ -16,18 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.sink;
+package org.apache.iotdb.db.mpp.execution.executor;
 
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.common.InstanceId;
 
-public class FragmentSinkNode extends SinkNode {
-  public FragmentSinkNode(PlanNodeId id) {
-    super(id);
-  }
 
-  @Override
-  public void send() {}
+// TODO should contain more fields, add as you want
+public class InstanceHandle {
 
-  @Override
-  public void close() throws Exception {}
+    private final InstanceId taskId;
+
+    public InstanceHandle(InstanceId taskId) {
+        this.taskId = taskId;
+    }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/OperatorContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/OperatorContext.java
index c635f74..472fa05 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/OperatorContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/OperatorContext.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.operator;
 
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 
 /**
  * Contains information about {@link Operator} execution.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
index 9954c74..9604721 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
@@ -16,19 +16,36 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.process;
 
-import org.apache.iotdb.db.mpp.common.QueryContext;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
 
-import java.util.List;
+public class AggregateOperator implements ProcessOperator {
 
-public class DistributedQueryPlan {
-  private QueryContext context;
-  private PlanNode<TsBlock> rootNode;
-  private PlanFragment rootFragment;
+    @Override
+    public OperatorContext getOperatorContext() {
+        return null;
+    }
 
-  // TODO: consider whether this field is necessary when do the implementation
-  private List<PlanFragment> fragments;
+    @Override
+    public ListenableFuture<Void> isBlocked() {
+        return ProcessOperator.super.isBlocked();
+    }
+
+    @Override
+    public TsBlock next() {
+        return null;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return false;
+    }
+
+    @Override
+    public void close() throws Exception {
+        ProcessOperator.super.close();
+    }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
index 9954c74..dab4df5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
@@ -16,19 +16,35 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.process;
 
-import org.apache.iotdb.db.mpp.common.QueryContext;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
 
-import java.util.List;
+public class DeviceMergeOperator implements ProcessOperator {
+    @Override
+    public OperatorContext getOperatorContext() {
+        return null;
+    }
 
-public class DistributedQueryPlan {
-  private QueryContext context;
-  private PlanNode<TsBlock> rootNode;
-  private PlanFragment rootFragment;
+    @Override
+    public ListenableFuture<Void> isBlocked() {
+        return ProcessOperator.super.isBlocked();
+    }
 
-  // TODO: consider whether this field is necessary when do the implementation
-  private List<PlanFragment> fragments;
+    @Override
+    public TsBlock next() {
+        return null;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return false;
+    }
+
+    @Override
+    public void close() throws Exception {
+        ProcessOperator.super.close();
+    }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java
similarity index 56%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java
index 9954c74..3b35646 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java
@@ -16,19 +16,35 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.process;
 
-import org.apache.iotdb.db.mpp.common.QueryContext;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
 
-import java.util.List;
+public class FillOperator implements ProcessOperator {
+    @Override
+    public OperatorContext getOperatorContext() {
+        return null;
+    }
 
-public class DistributedQueryPlan {
-  private QueryContext context;
-  private PlanNode<TsBlock> rootNode;
-  private PlanFragment rootFragment;
+    @Override
+    public ListenableFuture<Void> isBlocked() {
+        return ProcessOperator.super.isBlocked();
+    }
 
-  // TODO: consider whether this field is necessary when do the implementation
-  private List<PlanFragment> fragments;
+    @Override
+    public TsBlock next() {
+        return null;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return false;
+    }
+
+    @Override
+    public void close() throws Exception {
+        ProcessOperator.super.close();
+    }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java
index 9954c74..f0a707c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java
@@ -16,19 +16,36 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.process;
 
-import org.apache.iotdb.db.mpp.common.QueryContext;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
 
-import java.util.List;
+public class FilterNullOperator implements ProcessOperator {
 
-public class DistributedQueryPlan {
-  private QueryContext context;
-  private PlanNode<TsBlock> rootNode;
-  private PlanFragment rootFragment;
+    @Override
+    public OperatorContext getOperatorContext() {
+        return null;
+    }
 
-  // TODO: consider whether this field is necessary when do the implementation
-  private List<PlanFragment> fragments;
+    @Override
+    public ListenableFuture<Void> isBlocked() {
+        return ProcessOperator.super.isBlocked();
+    }
+
+    @Override
+    public TsBlock next() {
+        return null;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return false;
+    }
+
+    @Override
+    public void close() throws Exception {
+        ProcessOperator.super.close();
+    }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
index 9954c74..435e016 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
@@ -16,19 +16,36 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.process;
 
-import org.apache.iotdb.db.mpp.common.QueryContext;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
 
-import java.util.List;
+public class GroupByLevelOperator implements ProcessOperator {
 
-public class DistributedQueryPlan {
-  private QueryContext context;
-  private PlanNode<TsBlock> rootNode;
-  private PlanFragment rootFragment;
+    @Override
+    public OperatorContext getOperatorContext() {
+        return null;
+    }
 
-  // TODO: consider whether this field is necessary when do the implementation
-  private List<PlanFragment> fragments;
+    @Override
+    public ListenableFuture<Void> isBlocked() {
+        return ProcessOperator.super.isBlocked();
+    }
+
+    @Override
+    public TsBlock next() {
+        return null;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return false;
+    }
+
+    @Override
+    public void close() throws Exception {
+        ProcessOperator.super.close();
+    }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
similarity index 56%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
index 9954c74..60d68ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
@@ -16,19 +16,35 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.process;
 
-import org.apache.iotdb.db.mpp.common.QueryContext;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
 
-import java.util.List;
+public class LimitOperator implements ProcessOperator {
+    @Override
+    public OperatorContext getOperatorContext() {
+        return null;
+    }
 
-public class DistributedQueryPlan {
-  private QueryContext context;
-  private PlanNode<TsBlock> rootNode;
-  private PlanFragment rootFragment;
+    @Override
+    public ListenableFuture<Void> isBlocked() {
+        return ProcessOperator.super.isBlocked();
+    }
 
-  // TODO: consider whether this field is necessary when do the implementation
-  private List<PlanFragment> fragments;
+    @Override
+    public TsBlock next() {
+        return null;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return false;
+    }
+
+    @Override
+    public void close() throws Exception {
+        ProcessOperator.super.close();
+    }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java
index 9954c74..08f971b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java
@@ -16,19 +16,36 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.process;
 
-import org.apache.iotdb.db.mpp.common.QueryContext;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
 
-import java.util.List;
+public class OffsetOperator implements ProcessOperator {
 
-public class DistributedQueryPlan {
-  private QueryContext context;
-  private PlanNode<TsBlock> rootNode;
-  private PlanFragment rootFragment;
+    @Override
+    public OperatorContext getOperatorContext() {
+        return null;
+    }
 
-  // TODO: consider whether this field is necessary when do the implementation
-  private List<PlanFragment> fragments;
+    @Override
+    public ListenableFuture<Void> isBlocked() {
+        return ProcessOperator.super.isBlocked();
+    }
+
+    @Override
+    public TsBlock next() {
+        return null;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return false;
+    }
+
+    @Override
+    public void close() throws Exception {
+        ProcessOperator.super.close();
+    }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/ProcessOperator.java
similarity index 78%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragmentId.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/ProcessOperator.java
index 39f8d17..320e505 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/ProcessOperator.java
@@ -16,12 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.process;
 
-public class PlanFragmentId {
-  private String id;
+import org.apache.iotdb.db.mpp.operator.Operator;
+
+// TODO should think about what interfaces should this ProcessOperator have
+public interface ProcessOperator extends Operator {
 
-  public PlanFragmentId(String id) {
-    this.id = id;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java
similarity index 56%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java
index 9954c74..45b5da4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java
@@ -16,19 +16,36 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.process;
 
-import org.apache.iotdb.db.mpp.common.QueryContext;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
 
-import java.util.List;
+public class SortOperator implements ProcessOperator {
 
-public class DistributedQueryPlan {
-  private QueryContext context;
-  private PlanNode<TsBlock> rootNode;
-  private PlanFragment rootFragment;
+    @Override
+    public OperatorContext getOperatorContext() {
+        return null;
+    }
 
-  // TODO: consider whether this field is necessary when do the implementation
-  private List<PlanFragment> fragments;
+    @Override
+    public ListenableFuture<Void> isBlocked() {
+        return ProcessOperator.super.isBlocked();
+    }
+
+    @Override
+    public TsBlock next() {
+        return null;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return false;
+    }
+
+    @Override
+    public void close() throws Exception {
+        ProcessOperator.super.close();
+    }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
index 9954c74..0c95752 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
@@ -16,19 +16,36 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.process;
 
-import org.apache.iotdb.db.mpp.common.QueryContext;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
 
-import java.util.List;
+public class TimeJoinOperator implements ProcessOperator {
 
-public class DistributedQueryPlan {
-  private QueryContext context;
-  private PlanNode<TsBlock> rootNode;
-  private PlanFragment rootFragment;
+    @Override
+    public OperatorContext getOperatorContext() {
+        return null;
+    }
 
-  // TODO: consider whether this field is necessary when do the implementation
-  private List<PlanFragment> fragments;
+    @Override
+    public ListenableFuture<Void> isBlocked() {
+        return ProcessOperator.super.isBlocked();
+    }
+
+    @Override
+    public TsBlock next() {
+        return null;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return false;
+    }
+
+    @Override
+    public void close() throws Exception {
+        ProcessOperator.super.close();
+    }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNullNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/SinkOperator.java
similarity index 51%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNullNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/SinkOperator.java
index d7c463d..7f75c4a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNullNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/SinkOperator.java
@@ -16,23 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.operator.sink;
 
-import org.apache.iotdb.db.mpp.common.WithoutPolicy;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.operator.Operator;
 
-/** WithoutNode is used to discard specific rows from upstream node. */
-public class FilterNullNode extends ProcessNode {
+import java.nio.ByteBuffer;
 
-  // The policy to discard the result from upstream operator
-  private WithoutPolicy discardPolicy;
+public interface SinkOperator extends Operator {
 
-  public FilterNullNode(PlanNodeId id) {
-    super(id);
-  }
+    /**
+     * Sends a tsBlock to an unpartitioned buffer. If no-more-tsBlocks has been set, the send tsBlock
+     * call is ignored. This can happen with limit queries.
+     */
+    void send(ByteBuffer tsBlock);
 
-  public FilterNullNode(PlanNodeId id, WithoutPolicy discardPolicy) {
-    this(id);
-    this.discardPolicy = discardPolicy;
-  }
+    /**
+     * Notify SinkHandle that no more tsBlocks will be sent. Any future calls to send a tsBlock are
+     * ignored.
+     */
+    void setNoMoreTsBlocks();
+
+    /**
+     * Abort the sink handle, discarding all tsBlocks which may still in memory buffer, but blocking
+     * readers. It is expected that readers will be unblocked when the failed query is cleaned up.
+     */
+    void abort();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
similarity index 50%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
index 9954c74..83c6309 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
@@ -16,19 +16,41 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.source;
 
-import org.apache.iotdb.db.mpp.common.QueryContext;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 
-import java.util.List;
+public class SeriesAggregateScanOperator implements SourceOperator {
+    @Override
+    public OperatorContext getOperatorContext() {
+        return null;
+    }
 
-public class DistributedQueryPlan {
-  private QueryContext context;
-  private PlanNode<TsBlock> rootNode;
-  private PlanFragment rootFragment;
+    @Override
+    public ListenableFuture<Void> isBlocked() {
+        return SourceOperator.super.isBlocked();
+    }
 
-  // TODO: consider whether this field is necessary when do the implementation
-  private List<PlanFragment> fragments;
+    @Override
+    public TsBlock next() {
+        return null;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return false;
+    }
+
+    @Override
+    public void close() throws Exception {
+        SourceOperator.super.close();
+    }
+
+    @Override
+    public PlanNodeId getSourceId() {
+        return null;
+    }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
similarity index 51%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
index 9954c74..42085ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
@@ -16,19 +16,42 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.source;
 
-import org.apache.iotdb.db.mpp.common.QueryContext;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 
-import java.util.List;
+public class SeriesScanOperator implements SourceOperator {
 
-public class DistributedQueryPlan {
-  private QueryContext context;
-  private PlanNode<TsBlock> rootNode;
-  private PlanFragment rootFragment;
+    @Override
+    public OperatorContext getOperatorContext() {
+        return null;
+    }
 
-  // TODO: consider whether this field is necessary when do the implementation
-  private List<PlanFragment> fragments;
+    @Override
+    public ListenableFuture<Void> isBlocked() {
+        return SourceOperator.super.isBlocked();
+    }
+
+    @Override
+    public TsBlock next() {
+        return null;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return false;
+    }
+
+    @Override
+    public void close() throws Exception {
+        SourceOperator.super.close();
+    }
+
+    @Override
+    public PlanNodeId getSourceId() {
+        return null;
+    }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SourceOperator.java
similarity index 76%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragmentId.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SourceOperator.java
index 39f8d17..63b9acd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SourceOperator.java
@@ -16,12 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.source;
 
-public class PlanFragmentId {
-  private String id;
+import org.apache.iotdb.db.mpp.operator.Operator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 
-  public PlanFragmentId(String id) {
-    this.id = id;
-  }
+public interface SourceOperator extends Operator {
+
+    PlanNodeId getSourceId();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
new file mode 100644
index 0000000..920c52f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.planner;
+
+import org.apache.iotdb.db.mpp.execution.InstanceContext;
+import org.apache.iotdb.db.mpp.operator.Operator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.*;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
+
+import java.util.List;
+
+/**
+ * used to plan a fragment instance. Currently, we simply change it from PlanNode to executable Operator tree,
+ * but in the future, we may split one fragment instance into multiple pipeline to run a fragment instance parallel and take full advantage of multi-cores
+ */
+public class LocalExecutionPlanner {
+
+
+    /**
+     * This Visitor is responsible for transferring PlanNode Tree to Operator Tree
+     */
+    private class Visitor extends PlanVisitor<Operator, LocalExecutionPlanContext> {
+
+        @Override
+        public Operator visitPlan(PlanNode node, LocalExecutionPlanContext context) {
+            throw new UnsupportedOperationException("should call the concrete visitXX() method");
+        }
+
+        @Override
+        public Operator visitSeriesScan(SeriesScanNode node, LocalExecutionPlanContext context) {
+            return super.visitSeriesScan(node, context);
+        }
+
+        @Override
+        public Operator visitSeriesAggregate(SeriesAggregateScanNode node, LocalExecutionPlanContext context) {
+            return super.visitSeriesAggregate(node, context);
+        }
+
+        @Override
+        public Operator visitDeviceMerge(DeviceMergeNode node, LocalExecutionPlanContext context) {
+            return super.visitDeviceMerge(node, context);
+        }
+
+        @Override
+        public Operator visitFill(FillNode node, LocalExecutionPlanContext context) {
+            return super.visitFill(node, context);
+        }
+
+        @Override
+        public Operator visitFilter(FilterNode node, LocalExecutionPlanContext context) {
+            PlanNode child = node.getChild();
+
+            FilterOperator filterExpression = node.getPredicate();
+            List<String> outputSymbols = node.getOutputColumnNames();
+            return super.visitFilter(node, context);
+        }
+
+        @Override
+        public Operator visitFilterNull(FilterNullNode node, LocalExecutionPlanContext context) {
+            return super.visitFilterNull(node, context);
+        }
+
+        @Override
+        public Operator visitGroupByLevel(GroupByLevelNode node, LocalExecutionPlanContext context) {
+            return super.visitGroupByLevel(node, context);
+        }
+
+        @Override
+        public Operator visitLimit(LimitNode node, LocalExecutionPlanContext context) {
+            return super.visitLimit(node, context);
+        }
+
+        @Override
+        public Operator visitOffset(OffsetNode node, LocalExecutionPlanContext context) {
+            return super.visitOffset(node, context);
+        }
+
+        @Override
+        public Operator visitRowBasedSeriesAggregate(AggregateNode node, LocalExecutionPlanContext context) {
+            return super.visitRowBasedSeriesAggregate(node, context);
+        }
+
+        @Override
+        public Operator visitSort(SortNode node, LocalExecutionPlanContext context) {
+            return super.visitSort(node, context);
+        }
+
+        @Override
+        public Operator visitTimeJoin(TimeJoinNode node, LocalExecutionPlanContext context) {
+            return super.visitTimeJoin(node, context);
+        }
+    }
+
+    private static class LocalExecutionPlanContext {
+        private final InstanceContext taskContext;
+        private int nextOperatorId = 0;
+
+        public LocalExecutionPlanContext(InstanceContext taskContext) {
+            this.taskContext = taskContext;
+        }
+
+        private int getNextOperatorId() {
+            return nextOperatorId++;
+        }
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/optimzation/PlanOptimizer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/optimization/PlanOptimizer.java
similarity index 89%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/optimzation/PlanOptimizer.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/optimization/PlanOptimizer.java
index b99c5db..f98f014 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/optimzation/PlanOptimizer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/optimization/PlanOptimizer.java
@@ -16,11 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.optimzation;
+package org.apache.iotdb.db.mpp.sql.planner.optimization;
 
 import org.apache.iotdb.db.mpp.common.QueryContext;
 import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 
 public interface PlanOptimizer {
   PlanNode<TsBlock> optimize(PlanNode<TsBlock> plan, QueryContext context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
similarity index 89%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
index 9954c74..96185bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
@@ -16,17 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
 
 import org.apache.iotdb.db.mpp.common.QueryContext;
 import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 
 import java.util.List;
 
 public class DistributedQueryPlan {
   private QueryContext context;
-  private PlanNode<TsBlock> rootNode;
+  private PlanNode rootNode;
   private PlanFragment rootFragment;
 
   // TODO: consider whether this field is necessary when do the implementation
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributionPlanner.java
similarity index 95%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributionPlanner.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributionPlanner.java
index 03eb1dc..fc428cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributionPlanner.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
 
 import org.apache.iotdb.db.mpp.common.Analysis;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
similarity index 95%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstance.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index 0b405b3..dfdd764 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
 
 public class FragmentInstance {
   private FragmentInstanceId id;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstanceId.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstanceId.java
similarity index 95%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstanceId.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstanceId.java
index 18181cd..c47d549 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstanceId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstanceId.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
 
 public class FragmentInstanceId {
   private String id;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalPlanner.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalPlanner.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalPlanner.java
index a74644b..0a45b6f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalPlanner.java
@@ -16,11 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
 
 import org.apache.iotdb.db.mpp.common.Analysis;
 import org.apache.iotdb.db.mpp.common.QueryContext;
-import org.apache.iotdb.db.mpp.plan.optimzation.PlanOptimizer;
+import org.apache.iotdb.db.mpp.sql.planner.optimization.PlanOptimizer;
 
 import java.util.List;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalQueryPlan.java
similarity index 88%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalQueryPlan.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalQueryPlan.java
index 5094df6..f3dce2f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalQueryPlan.java
@@ -16,11 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
 
 import org.apache.iotdb.db.mpp.common.QueryContext;
 import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 
 /**
  * LogicalQueryPlan represents a logical query plan. It stores the root node of corresponding query
@@ -28,5 +28,5 @@ import org.apache.iotdb.db.mpp.plan.node.PlanNode;
  */
 public class LogicalQueryPlan {
   private QueryContext context;
-  private PlanNode<TsBlock> rootNode;
+  private PlanNode rootNode;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
similarity index 88%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragment.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
index fc49264..bf13247 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
@@ -16,14 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
 
 import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 
 // TODO: consider whether it is necessary to make PlanFragment as a TreeNode
 /** PlanFragment contains a sub-query of distributed query. */
 public class PlanFragment {
   private PlanFragmentId id;
-  private PlanNode<TsBlock> root;
+  private PlanNode root;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragmentId.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
index 39f8d17..38a56f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
 
 public class PlanFragmentId {
   private String id;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
similarity index 56%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
index 1a3f103..583d4a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
@@ -16,19 +16,35 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node;
 
-import org.apache.iotdb.db.mpp.common.TreeNode;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
 
 /**
- * @author xingtanzjr The base class of query executable operators, which is used to compose logical
- *     query plan. TODO: consider how to restrict the children type for each type of ExecOperator
- *     TODO: consider to fix the Template type as TsBlock
+ * The base class of query executable operators, which is used to compose logical query plan.
  */
-public abstract class PlanNode<T> extends TreeNode<PlanNode<T>> {
+// TODO: consider how to restrict the children type for each type of ExecOperator
+public abstract class PlanNode {
+
   private PlanNodeId id;
 
-  public PlanNode(PlanNodeId id) {
+  protected PlanNode(PlanNodeId id) {
+    requireNonNull(id, "id is null");
     this.id = id;
   }
+
+  public PlanNodeId getId() {
+    return id;
+  }
+
+  public abstract List<PlanNode> getChildren();
+
+  public abstract List<String> getOutputColumnNames();
+
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitPlan(this, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeId.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeId.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java
index 576fd64..426c778 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java
@@ -14,7 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-package org.apache.iotdb.db.mpp.plan.node;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node;
 
 public class PlanNodeId {
   private String id;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeIdAllocator.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeIdAllocator.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeIdAllocator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeIdAllocator.java
index 6e70c20..919f303 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeIdAllocator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeIdAllocator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node;
 
 /** A centralized PlanNodeId generator */
 public class PlanNodeIdAllocator {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
new file mode 100644
index 0000000..ecbc043
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.planner.plan.node;
+
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.*;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+
+public abstract class PlanVisitor<R, C> {
+
+    public abstract R visitPlan(PlanNode node, C context);
+
+    public R visitSeriesScan(SeriesScanNode node, C context) {
+        return visitPlan(node, context);
+    }
+
+    public R visitSeriesAggregate(SeriesAggregateScanNode node, C context) {
+        return visitPlan(node, context);
+    }
+
+    public R visitDeviceMerge(DeviceMergeNode node, C context) {
+        return visitPlan(node, context);
+    }
+
+    public R visitFill(FillNode node, C context) {
+        return visitPlan(node, context);
+    }
+
+    public R visitFilter(FilterNode node, C context) {
+        return visitPlan(node, context);
+    }
+
+    public R visitFilterNull(FilterNullNode node, C context) {
+        return visitPlan(node, context);
+    }
+
+    public R visitGroupByLevel(GroupByLevelNode node, C context) {
+        return visitPlan(node, context);
+    }
+
+    public R visitLimit(LimitNode node, C context) {
+        return visitPlan(node, context);
+    }
+
+    public R visitOffset(OffsetNode node, C context) {
+        return visitPlan(node, context);
+    }
+
+    public R visitRowBasedSeriesAggregate(AggregateNode node, C context) {
+        return visitPlan(node, context);
+    }
+
+    public R visitSort(SortNode node, C context) {
+        return visitPlan(node, context);
+    }
+
+    public R visitTimeJoin(TimeJoinNode node, C context) {
+        return visitPlan(node, context);
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/RowBasedSeriesAggregateNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
similarity index 53%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/RowBasedSeriesAggregateNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
index 9d7b943..4588ea4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/RowBasedSeriesAggregateNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
@@ -16,20 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.db.mpp.common.GroupByTimeParameter;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
 
 import java.util.List;
+import java.util.Map;
 
 /**
- * This node is used to aggregate required series by raw data. The raw data will be input as a
- * TsBlock. This node will output the series aggregated result represented by TsBlock Thus, the
- * columns in output TsBlock will be different from input TsBlock.
+ * This node is used to aggregate required series from multiple sources. The source data will be input as a TsBlock,
+ * it may be raw data or partial aggregation result.
+ * This node will output the final series aggregated result represented by TsBlock.
  */
-public class RowBasedSeriesAggregateNode extends ProcessNode {
+public class AggregateNode extends ProcessNode {
   // The parameter of `group by time`
   // Its value will be null if there is no `group by time` clause,
   private GroupByTimeParameter groupByTimeParameter;
@@ -38,22 +41,32 @@ public class RowBasedSeriesAggregateNode extends ProcessNode {
   // result TsBlock
   // (Currently we only support one series in the aggregation function)
   // TODO: need consider whether it is suitable the aggregation function using FunctionExpression
-  private List<FunctionExpression> aggregateFuncList;
+  private Map<String, FunctionExpression> aggregateFuncMap;
 
-  public RowBasedSeriesAggregateNode(PlanNodeId id) {
+  private final List<PlanNode> children;
+  private final List<String> columnNames;
+
+  public AggregateNode(PlanNodeId id, Map<String, FunctionExpression> aggregateFuncMap, List<PlanNode> children, List<String> columnNames) {
     super(id);
+    this.aggregateFuncMap = aggregateFuncMap;
+    this.children = children;
+    this.columnNames = columnNames;
+  }
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return children;
   }
 
-  public RowBasedSeriesAggregateNode(PlanNodeId id, List<FunctionExpression> aggregateFuncList) {
-    this(id);
-    this.aggregateFuncList = aggregateFuncList;
+  @Override
+  public List<String> getOutputColumnNames() {
+    return columnNames;
   }
 
-  public RowBasedSeriesAggregateNode(
-      PlanNodeId id,
-      List<FunctionExpression> aggregateFuncList,
-      GroupByTimeParameter groupByTimeParameter) {
-    this(id, aggregateFuncList);
-    this.groupByTimeParameter = groupByTimeParameter;
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitRowBasedSeriesAggregate(this, context);
   }
+
+
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
similarity index 69%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/DeviceMergeNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
index 9269545..1f61e88 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
@@ -16,14 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.db.mpp.common.OrderBy;
-import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.common.WithoutPolicy;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.common.FilterNullPolicy;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -43,23 +44,42 @@ public class DeviceMergeNode extends ProcessNode {
   // The without policy is able to be push down to the DeviceMergeNode because we can know whether a
   // row contains
   // null or not.
-  private WithoutPolicy withoutPolicy;
+  private FilterNullPolicy filterNullPolicy;
 
   // The map from deviceName to corresponding query result node responsible for that device.
   // DeviceNode means the node whose output TsBlock contains the data belonged to one device.
-  private Map<String, PlanNode<TsBlock>> childDeviceNodeMap;
+  private Map<String, PlanNode> childDeviceNodeMap;
+
+  private List<PlanNode> children;
+
+  private List<String> columnNames;
 
   public DeviceMergeNode(PlanNodeId id) {
     super(id);
   }
 
-  public DeviceMergeNode(PlanNodeId id, Map<String, PlanNode<TsBlock>> deviceNodeMap) {
+  @Override
+  public List<PlanNode> getChildren() {
+    return children;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return columnNames;
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitDeviceMerge(this, context);
+  }
+
+  public DeviceMergeNode(PlanNodeId id, Map<String, PlanNode> deviceNodeMap) {
     this(id);
     this.childDeviceNodeMap = deviceNodeMap;
     this.children.addAll(deviceNodeMap.values());
   }
 
-  public void addChildDeviceNode(String deviceName, PlanNode<TsBlock> childNode) {
+  public void addChildDeviceNode(String deviceName, PlanNode childNode) {
     this.childDeviceNodeMap.put(deviceName, childNode);
     this.children.add(childNode);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FillNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
similarity index 64%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FillNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
index 31e57cd..29396c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FillNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
@@ -16,14 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.iotdb.db.mpp.common.FillPolicy;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+
+import java.util.List;
 
 /** FillNode is used to fill the empty field in one row. */
 public class FillNode extends ProcessNode {
 
+  private PlanNode child;
+
   // The policy to discard the result from upstream node
   private FillPolicy fillPolicy;
 
@@ -31,6 +38,21 @@ public class FillNode extends ProcessNode {
     super(id);
   }
 
+  @Override
+  public List<PlanNode> getChildren() {
+    return ImmutableList.of(child);
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return child.getOutputColumnNames();
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitFill(this, context);
+  }
+
   public FillNode(PlanNodeId id, FillPolicy fillPolicy) {
     this(id);
     this.fillPolicy = fillPolicy;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
new file mode 100644
index 0000000..a6108fe
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
+
+import java.util.List;
+
+/** The FilterNode is responsible to filter the RowRecord from TsBlock. */
+public class FilterNode extends ProcessNode {
+
+  private final PlanNode child;
+  // TODO we need to rename it to something like expression in order to distinguish from Operator class
+  private final FilterOperator predicate;
+
+  public FilterNode(PlanNodeId id, PlanNode child, FilterOperator predicate) {
+    super(id);
+    this.child = child;
+    this.predicate = predicate;
+  }
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return ImmutableList.of(child);
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return child.getOutputColumnNames();
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitFilter(this, context);
+  }
+
+  public FilterOperator getPredicate() {
+    return predicate;
+  }
+
+  public PlanNode getChild() {
+    return child;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
new file mode 100644
index 0000000..976af21
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.iotdb.db.mpp.common.FilterNullPolicy;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
+
+import java.util.List;
+
+/** WithoutNode is used to discard specific rows from upstream node. */
+public class FilterNullNode extends ProcessNode {
+
+  // The policy to discard the result from upstream operator
+  private FilterNullPolicy discardPolicy;
+
+  private PlanNode child;
+
+  private List<String> filterNullColumnNames;
+
+
+  public FilterNullNode(PlanNodeId id, PlanNode child) {
+    super(id);
+    this.child = child;
+  }
+
+  public FilterNullNode(PlanNodeId id, PlanNode child, List<String> filterNullColumnNames) {
+    super(id);
+    this.child = child;
+    this.filterNullColumnNames = filterNullColumnNames;
+  }
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return ImmutableList.of(child);
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return child.getOutputColumnNames();
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitFilterNull(this, context);
+  }
+
+  public void setFilterNullColumnNames(List<String> filterNullColumnNames) {
+    this.filterNullColumnNames = filterNullColumnNames;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
similarity index 59%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/GroupByLevelNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
index 538d6d8..b877933 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
@@ -16,9 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+
+import java.util.List;
 
 /**
  * This node is responsible for the final aggregation merge operation. It will process the data from
@@ -32,10 +36,47 @@ import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
  */
 public class GroupByLevelNode extends ProcessNode {
 
+  private PlanNode child;
+
   private int[] groupByLevels;
 
-  public GroupByLevelNode(PlanNodeId id, int[] groupByLevels) {
+  private List<String> columnNames;
+
+  public GroupByLevelNode(PlanNodeId id, PlanNode child, int[] groupByLevels, List<String> columnNames) {
     super(id);
+    this.child = child;
+    this.groupByLevels = groupByLevels;
+    this.columnNames = columnNames;
+  }
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return child.getChildren();
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return columnNames;
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitGroupByLevel(this, context);
+  }
+
+  public int[] getGroupByLevels() {
+    return groupByLevels;
+  }
+
+  public void setGroupByLevels(int[] groupByLevels) {
     this.groupByLevels = groupByLevels;
   }
+
+  public List<String> getColumnNames() {
+    return columnNames;
+  }
+
+  public void setColumnNames(List<String> columnNames) {
+    this.columnNames = columnNames;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
similarity index 53%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/LimitNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
index 9596c1a..67b7203 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/LimitNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
@@ -16,22 +16,48 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import com.google.common.collect.ImmutableList;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+
+import java.util.List;
 
 /** LimitNode is used to select top n result. It uses the default order of upstream nodes */
 public class LimitNode extends ProcessNode {
 
   // The limit count
-  private int limit;
+  private final int limit;
+  private final PlanNode child;
 
-  public LimitNode(PlanNodeId id) {
+  public LimitNode(PlanNodeId id, int limit, PlanNode child) {
     super(id);
+    this.limit = limit;
+    this.child = child;
   }
 
-  public LimitNode(PlanNodeId id, int limit) {
-    this(id);
-    this.limit = limit;
+  @Override
+  public List<PlanNode> getChildren() {
+    return ImmutableList.of(child);
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return child.getOutputColumnNames();
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitLimit(this, context);
+  }
+
+  public int getLimit() {
+    return limit;
+  }
+
+  public PlanNode getChild() {
+    return child;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/OffsetNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
similarity index 56%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/OffsetNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
index 01e0e93..2e3fc78 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/OffsetNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
@@ -16,9 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+
+import java.util.List;
 
 /**
  * OffsetNode is used to skip top n result from upstream nodes. It uses the default order of
@@ -27,14 +31,35 @@ import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
 public class OffsetNode extends ProcessNode {
 
   // The limit count
-  private int offset;
+  private final PlanNode child;
+  private final int offset;
 
-  public OffsetNode(PlanNodeId id) {
+  public OffsetNode(PlanNodeId id, PlanNode child, int offset) {
     super(id);
+    this.child = child;
+    this.offset = offset;
   }
 
-  public OffsetNode(PlanNodeId id, int offset) {
-    this(id);
-    this.offset = offset;
+  @Override
+  public List<PlanNode> getChildren() {
+    return null;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitOffset(this, context);
+  }
+
+  public PlanNode getChild() {
+    return child;
+  }
+
+  public int getOffset() {
+    return offset;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ProcessNode.java
similarity index 78%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ProcessNode.java
index 63e07b4..8348a21 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ProcessNode.java
@@ -16,13 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+
+public abstract class ProcessNode extends PlanNode {
 
-public class ProcessNode extends PlanNode<TsBlock> {
   public ProcessNode(PlanNodeId id) {
     super(id);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/SortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
similarity index 57%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/SortNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
index 1e83783..49b59a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/SortNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
@@ -16,10 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.iotdb.db.mpp.common.OrderBy;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+
+import java.util.List;
 
 /**
  * In general, the parameter in sortNode should be pushed down to the upstream operators. In our
@@ -27,14 +32,31 @@ import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
  */
 public class SortNode extends ProcessNode {
 
+  private final PlanNode child;
+
+  private final List<String> orderBy;
+
   private OrderBy sortOrder;
 
-  public SortNode(PlanNodeId id) {
+  public SortNode(PlanNodeId id, PlanNode child, List<String> orderBy, OrderBy sortOrder) {
     super(id);
+    this.child = child;
+    this.orderBy = orderBy;
+    this.sortOrder = sortOrder;
   }
 
-  public SortNode(PlanNodeId id, OrderBy sortOrder) {
-    this(id);
-    this.sortOrder = sortOrder;
+  @Override
+  public List<PlanNode> getChildren() {
+    return ImmutableList.of(child);
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return child.getOutputColumnNames();
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitSort(this, context);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
similarity index 56%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/TimeJoinNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
index ab48cc4..998594c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/TimeJoinNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
@@ -16,15 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
+import org.apache.iotdb.db.mpp.common.FilterNullPolicy;
 import org.apache.iotdb.db.mpp.common.OrderBy;
-import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.common.WithoutPolicy;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 
-import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * TimeJoinOperator is responsible for join two or more TsBlock. The join algorithm is like outer
@@ -41,27 +42,46 @@ public class TimeJoinNode extends ProcessNode {
   // The without policy is able to be push down to the TimeJoinOperator because we can know whether
   // a row contains
   // null or not.
-  private WithoutPolicy withoutPolicy;
+  private FilterNullPolicy filterNullPolicy;
 
-  public TimeJoinNode(PlanNodeId id) {
+  private List<PlanNode> children;
+
+
+  public TimeJoinNode(PlanNodeId id, OrderBy mergeOrder, FilterNullPolicy filterNullPolicy, List<PlanNode> children) {
     super(id);
-    this.mergeOrder = OrderBy.TIMESTAMP_ASC;
+    this.mergeOrder = mergeOrder;
+    this.filterNullPolicy = filterNullPolicy;
+    this.children = children;
   }
 
-  public TimeJoinNode(PlanNodeId id, PlanNode<TsBlock>... children) {
-    super(id);
-    this.children.addAll(Arrays.asList(children));
+  @Override
+  public List<PlanNode> getChildren() {
+    return children;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return children.stream().flatMap(child -> child.getOutputColumnNames().stream()).collect(Collectors.toList());
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitTimeJoin(this, context);
   }
 
-  public void addChild(PlanNode<TsBlock> child) {
+  public void addChild(PlanNode child) {
     this.children.add(child);
   }
 
+  public void setChildren(List<PlanNode> children) {
+    this.children = children;
+  }
+
   public void setMergeOrder(OrderBy mergeOrder) {
     this.mergeOrder = mergeOrder;
   }
 
-  public void setWithoutPolicy(WithoutPolicy withoutPolicy) {
-    this.withoutPolicy = withoutPolicy;
+  public void setWithoutPolicy(FilterNullPolicy filterNullPolicy) {
+    this.filterNullPolicy = filterNullPolicy;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/CsvSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/CsvSinkNode.java
similarity index 73%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/CsvSinkNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/CsvSinkNode.java
index 0bac6bf..2fddbdd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/CsvSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/CsvSinkNode.java
@@ -16,9 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.sink;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
 
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+
+import java.util.List;
 
 public class CsvSinkNode extends SinkNode {
   public CsvSinkNode(PlanNodeId id) {
@@ -26,6 +29,16 @@ public class CsvSinkNode extends SinkNode {
   }
 
   @Override
+  public List<PlanNode> getChildren() {
+    return null;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
   public void close() throws Exception {}
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
similarity index 73%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/FragmentSinkNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index 58c3a71..da30223 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -16,9 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.sink;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
 
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+
+import java.util.List;
 
 public class FragmentSinkNode extends SinkNode {
   public FragmentSinkNode(PlanNodeId id) {
@@ -26,6 +29,16 @@ public class FragmentSinkNode extends SinkNode {
   }
 
   @Override
+  public List<PlanNode> getChildren() {
+    return null;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
   public void send() {}
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/SinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/SinkNode.java
similarity index 78%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/SinkNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/SinkNode.java
index f59effb..7221fda 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/SinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/SinkNode.java
@@ -16,13 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.sink;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
 
 import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 
-public abstract class SinkNode extends PlanNode<TsBlock> implements AutoCloseable {
+public abstract class SinkNode extends PlanNode implements AutoCloseable {
 
   public SinkNode(PlanNodeId id) {
     super(id);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/ThriftSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/ThriftSinkNode.java
similarity index 74%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/ThriftSinkNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/ThriftSinkNode.java
index f5c48df..bb343f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/ThriftSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/ThriftSinkNode.java
@@ -16,9 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.sink;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
 
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+
+import java.util.List;
 
 /** not implemented in current IoTDB yet */
 public class ThriftSinkNode extends SinkNode {
@@ -28,6 +31,16 @@ public class ThriftSinkNode extends SinkNode {
   }
 
   @Override
+  public List<PlanNode> getChildren() {
+    return null;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
   public void close() throws Exception {}
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/CsvSourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
similarity index 74%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/CsvSourceNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
index a2a0fde..612e930 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/CsvSourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
@@ -16,9 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.source;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
 
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+
+import java.util.List;
 
 /** Not implemented in current version. */
 public class CsvSourceNode extends SourceNode {
@@ -28,6 +31,16 @@ public class CsvSourceNode extends SourceNode {
   }
 
   @Override
+  public List<PlanNode> getChildren() {
+    return null;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
   public void close() throws Exception {}
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesAggregateNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
similarity index 75%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesAggregateNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
index 80ea58f..6bdf1c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesAggregateNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
@@ -16,13 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.source;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.iotdb.db.mpp.common.GroupByTimeParameter;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
+import java.util.List;
+
 /**
  * SeriesAggregateOperator is responsible to do the aggregation calculation for one series. It will
  * read the target series and calculate the aggregation result by the aggregation digest or raw data
@@ -38,7 +43,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
  * represent the whole aggregation result of this series. And the timestamp will be 0, which is
  * meaningless.
  */
-public class SeriesAggregateNode extends SourceNode {
+public class SeriesAggregateScanNode extends SourceNode {
 
   // The parameter of `group by time`
   // Its value will be null if there is no `group by time` clause,
@@ -51,16 +56,28 @@ public class SeriesAggregateNode extends SourceNode {
 
   private Filter filter;
 
-  public SeriesAggregateNode(PlanNodeId id) {
+  private String columnName;
+
+  public SeriesAggregateScanNode(PlanNodeId id) {
     super(id);
   }
 
-  public SeriesAggregateNode(PlanNodeId id, FunctionExpression aggregateFunc) {
+  @Override
+  public List<PlanNode> getChildren() {
+    return ImmutableList.of();
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return ImmutableList.of(columnName);
+  }
+
+  public SeriesAggregateScanNode(PlanNodeId id, FunctionExpression aggregateFunc) {
     this(id);
     this.aggregateFunc = aggregateFunc;
   }
 
-  public SeriesAggregateNode(
+  public SeriesAggregateScanNode(
       PlanNodeId id, FunctionExpression aggregateFunc, GroupByTimeParameter groupByTimeParameter) {
     this(id, aggregateFunc);
     this.groupByTimeParameter = groupByTimeParameter;
@@ -72,6 +89,11 @@ public class SeriesAggregateNode extends SourceNode {
   @Override
   public void close() throws Exception {}
 
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitSeriesAggregate(this, context);
+  }
+
   // This method is used when do the PredicatePushDown.
   // The filter is not put in the constructor because the filter is only clear in the predicate
   // push-down stage
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
similarity index 79%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesScanNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
index ccfae5c..aa02fbe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
@@ -16,13 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.source;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.OrderBy;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
+import java.util.List;
+
 /**
  * SeriesScanOperator is responsible for read data a specific series. When reading data, the
  * SeriesScanOperator can read the raw data batch by batch. And also, it can leverage the filter and
@@ -52,6 +57,8 @@ public class SeriesScanNode extends SourceNode {
   // offset for result set. The default value is 0
   private int offset;
 
+  private String columnName;
+
   public SeriesScanNode(PlanNodeId id, PartialPath seriesPath) {
     super(id);
     this.seriesPath = seriesPath;
@@ -82,4 +89,19 @@ public class SeriesScanNode extends SourceNode {
   public void setOffset(int offset) {
     this.offset = offset;
   }
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return ImmutableList.of();
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return ImmutableList.of(columnName);
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitSeriesScan(this, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
similarity index 78%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SourceNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
index c83da97..ee25e5a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
@@ -16,13 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.source;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
 
 import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 
-public abstract class SourceNode extends PlanNode<TsBlock> implements AutoCloseable {
+public abstract class SourceNode extends PlanNode implements AutoCloseable {
 
   public SourceNode(PlanNodeId id) {
     super(id);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/tree/Expression.java
similarity index 89%
rename from server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/tree/Expression.java
index 7a4107e..45decc1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/tree/Expression.java
@@ -16,9 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.common;
+package org.apache.iotdb.db.mpp.sql.tree;
+
+public class Expression {
 
-public enum WithoutPolicy {
-  CONTAINS_NULL,
-  ALL_NULL
 }