You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/03/20 06:33:19 UTC

[iotdb] branch master updated: [IOTDB-2767] Add Operators and PlanVisitor definitions for mpp (#5286)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c83ccfa  [IOTDB-2767] Add Operators and PlanVisitor definitions for mpp (#5286)
c83ccfa is described below

commit c83ccfa2b33c9d51572796ce23520e87911c4811
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Sun Mar 20 14:32:35 2022 +0800

    [IOTDB-2767] Add Operators and PlanVisitor definitions for mpp (#5286)
---
 .../{WithoutPolicy.java => FilterNullPolicy.java}  |   2 +-
 .../mpp/common/{TreeNode.java => FragmentId.java}  |  40 +++++--
 .../{QueryId.java => FragmentInstanceId.java}      |  11 +-
 .../org/apache/iotdb/db/mpp/common/QueryId.java    |  90 +++++++++++++-
 .../org/apache/iotdb/db/mpp/common/TsBlock.java    |  25 ++++
 .../TreeNode.java => execution/FragmentInfo.java}  |  33 +++---
 .../db/mpp/execution/FragmentInstanceContext.java  |  67 +++++++++++
 .../db/mpp/execution/FragmentInstanceState.java    |  68 +++++++++++
 .../iotdb/db/mpp/execution/FragmentState.java      |  71 +++++++++++
 .../iotdb/db/mpp/execution/QueryExecution.java     |  10 +-
 .../ClusterScheduler.java}                         |  34 +++++-
 .../scheduler/IScheduler.java}                     |  30 ++---
 .../execution/scheduler/StandaloneScheduler.java   |  56 +++++++++
 .../iotdb/db/mpp/operator/OperatorContext.java     |   6 +-
 .../process/AggregateOperator.java}                |  35 +++++-
 .../process/DeviceMergeOperator.java}              |  34 +++++-
 .../process/FillOperator.java}                     |  34 +++++-
 .../process/FilterNullOperator.java}               |  35 +++++-
 .../process/GroupByLevelOperator.java}             |  35 +++++-
 .../db/mpp/operator/process/LimitOperator.java     |  75 ++++++++++++
 .../process/OffsetOperator.java}                   |  35 +++++-
 .../process/ProcessOperator.java}                  |  15 +--
 .../process/SortOperator.java}                     |  35 +++++-
 .../process/TimeJoinOperator.java}                 |  35 +++++-
 .../sink/FragmentSinkOperator.java}                |  51 +++++---
 .../sink/SinkOperator.java}                        |  29 +++--
 .../source/SeriesAggregateScanOperator.java}       |  45 ++++---
 .../source/SeriesScanOperator.java}                |  46 ++++---
 .../source/SourceOperator.java}                    |  14 +--
 .../iotdb/db/mpp/plan/FragmentInstanceId.java      |  30 -----
 .../db/mpp/plan/node/process/FilterNullNode.java   |  38 ------
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  | 132 +++++++++++++++++++++
 .../planner/optimization}/PlanOptimizer.java       |   7 +-
 .../planner}/plan/DistributedQueryPlan.java        |   7 +-
 .../planner}/plan/DistributionPlanner.java         |   2 +-
 .../{ => sql/planner}/plan/FragmentInstance.java   |   4 +-
 .../mpp/{ => sql/planner}/plan/LogicalPlanner.java |   4 +-
 .../{ => sql/planner}/plan/LogicalQueryPlan.java   |   7 +-
 .../mpp/{ => sql/planner}/plan/PlanFragment.java   |   7 +-
 .../mpp/{ => sql/planner}/plan/PlanFragmentId.java |   2 +-
 .../mpp/{ => sql/planner}/plan/node/PlanNode.java  |  31 +++--
 .../{ => sql/planner}/plan/node/PlanNodeId.java    |   2 +-
 .../planner}/plan/node/PlanNodeIdAllocator.java    |   2 +-
 .../db/mpp/sql/planner/plan/node/PlanVisitor.java  |  76 ++++++++++++
 .../planner/plan/node/process/AggregateNode.java}  |  49 +++++---
 .../plan/node/process/DeviceMergeNode.java         |  38 ++++--
 .../planner}/plan/node/process/FillNode.java       |  27 ++++-
 .../sql/planner/plan/node/process/FilterNode.java  |  66 +++++++++++
 .../planner/plan/node/process/FilterNullNode.java  |  69 +++++++++++
 .../plan/node/process/GroupByLevelNode.java        |  48 +++++++-
 .../planner}/plan/node/process/LimitNode.java      |  41 +++++--
 .../planner}/plan/node/process/OffsetNode.java     |  39 ++++--
 .../planner}/plan/node/process/ProcessNode.java    |  10 +-
 .../planner}/plan/node/process/SortNode.java       |  35 +++++-
 .../planner}/plan/node/process/TimeJoinNode.java   |  55 ++++++---
 .../planner}/plan/node/sink/CsvSinkNode.java       |  17 ++-
 .../planner}/plan/node/sink/FragmentSinkNode.java  |  17 ++-
 .../{ => sql/planner}/plan/node/sink/SinkNode.java |   9 +-
 .../planner}/plan/node/sink/ThriftSinkNode.java    |  17 ++-
 .../planner}/plan/node/source/CsvSourceNode.java   |  17 ++-
 .../plan/node/source/SeriesAggregateScanNode.java} |  35 +++++-
 .../planner}/plan/node/source/SeriesScanNode.java  |  27 ++++-
 .../planner}/plan/node/source/SourceNode.java      |   9 +-
 .../tree/Expression.java}                          |   7 +-
 64 files changed, 1682 insertions(+), 397 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java
similarity index 96%
copy from server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java
index 7a4107e..7979b27 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.common;
 
-public enum WithoutPolicy {
+public enum FilterNullPolicy {
   CONTAINS_NULL,
   ALL_NULL
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/TreeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentId.java
similarity index 50%
copy from server/src/main/java/org/apache/iotdb/db/mpp/common/TreeNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentId.java
index 4a11358..ef0df6b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/TreeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentId.java
@@ -20,22 +20,38 @@ package org.apache.iotdb.db.mpp.common;
 
 import java.util.List;
 
-/**
- * @author A simple class to describe the tree style structure of query executable operators
- * @param <T>
- */
-public class TreeNode<T extends TreeNode<T>> {
-  protected List<T> children;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+public class FragmentId {
+
+  private final QueryId queryId;
+  private final int id;
+
+  public static FragmentId valueOf(String stageId) {
+    List<String> ids = QueryId.parseDottedId(stageId, 2, "stageId");
+    return valueOf(ids);
+  }
+
+  public static FragmentId valueOf(List<String> ids) {
+    checkArgument(ids.size() == 2, "Expected two ids but got: %s", ids);
+    return new FragmentId(new QueryId(ids.get(0)), Integer.parseInt(ids.get(1)));
+  }
+
+  public FragmentId(String queryId, int id) {
+    this(new QueryId(queryId), id);
+  }
 
-  public T getChild(int i) {
-    return hasChild(i) ? children.get(i) : null;
+  public FragmentId(QueryId queryId, int id) {
+    this.queryId = requireNonNull(queryId, "queryId is null");
+    this.id = id;
   }
 
-  public boolean hasChild(int i) {
-    return children.size() > i;
+  public QueryId getQueryId() {
+    return queryId;
   }
 
-  public void addChild(T n) {
-    children.add(n);
+  public int getId() {
+    return id;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
similarity index 86%
copy from server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
index dd8d436..1ab92ed 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
@@ -18,14 +18,11 @@
  */
 package org.apache.iotdb.db.mpp.common;
 
-public class QueryId {
-  private String Id;
+public class FragmentInstanceId {
 
-  public String getId() {
-    return Id;
-  }
+  private final String fullId;
 
-  public void setId(String id) {
-    Id = id;
+  public FragmentInstanceId(String fullId) {
+    this.fullId = fullId;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
index dd8d436..88289b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
@@ -18,14 +18,96 @@
  */
 package org.apache.iotdb.db.mpp.common;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
 public class QueryId {
-  private String Id;
+
+  private final String id;
+
+  public static QueryId valueOf(String queryId) {
+    // ID is verified in the constructor
+    return new QueryId(queryId);
+  }
+
+  public QueryId(String id) {
+    this.id = validateId(id);
+  }
 
   public String getId() {
-    return Id;
+    return id;
+  }
+
+  @Override
+  public String toString() {
+    return id;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(id);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    QueryId other = (QueryId) obj;
+    return Objects.equals(this.id, other.id);
+  }
+
+  public static List<String> parseDottedId(String id, int expectedParts, String name) {
+    requireNonNull(id, "id is null");
+    checkArgument(expectedParts > 0, "expectedParts must be at least 1");
+    requireNonNull(name, "name is null");
+
+    List<String> ids = Arrays.asList(id.split("\\."));
+    checkArgument(ids.size() == expectedParts, "Invalid %s %s", name, id);
+
+    for (String part : ids) {
+      checkArgument(!part.isEmpty(), "Invalid id %s", id);
+      checkArgument(isValidId(part), "Invalid id %s", id);
+    }
+    return ids;
+  }
+
+  private static void checkArgument(boolean condition, String message, Object... messageArgs) {
+    if (!condition) {
+      throw new IllegalArgumentException(format(message, messageArgs));
+    }
+  }
+
+  //
+  // Id helper methods
+  //
+
+  // Check if the string matches [_a-z0-9]+ , but without the overhead of regex
+  private static boolean isValidId(String id) {
+    if (id.length() == 0) {
+      return false;
+    }
+
+    for (int i = 0; i < id.length(); i++) {
+      char c = id.charAt(i);
+      if (!(c == '_' || c >= 'a' && c <= 'z' || c >= '0' && c <= '9')) {
+        return false;
+      }
+    }
+    return true;
   }
 
-  public void setId(String id) {
-    Id = id;
+  public static String validateId(String id) {
+    requireNonNull(id, "id is null");
+    checkArgument(!id.isEmpty(), "id is empty");
+    checkArgument(isValidId(id), "Invalid id %s", id);
+    return id;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
index aa40205..7ea49ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
@@ -20,6 +20,8 @@ package org.apache.iotdb.db.mpp.common;
 
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 
+import static java.lang.String.format;
+
 /**
  * Intermediate result for most of ExecOperators. The Tablet contains data from one or more columns
  * and constructs them as a row based view The columns can be series, aggregation result for one
@@ -33,6 +35,8 @@ public class TsBlock {
   // Describe the column info
   private TsBlockMetadata metadata;
 
+  private int count;
+
   public boolean hasNext() {
     return false;
   }
@@ -45,4 +49,25 @@ public class TsBlock {
   public TsBlockMetadata getMetadata() {
     return metadata;
   }
+
+  public int getCount() {
+    return count;
+  }
+
+  /**
+   * TODO has not been implemented yet
+   *
+   * @param positionOffset start offset
+   * @param length slice length
+   * @return view of current TsBlock start from positionOffset to positionOffset + length
+   */
+  public TsBlock getRegion(int positionOffset, int length) {
+    if (positionOffset < 0 || length < 0 || positionOffset + length > count) {
+      throw new IndexOutOfBoundsException(
+          format(
+              "Invalid position %s and length %s in page with %s positions",
+              positionOffset, length, count));
+    }
+    return this;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/TreeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java
similarity index 57%
rename from server/src/main/java/org/apache/iotdb/db/mpp/common/TreeNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java
index 4a11358..7bd9300 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/TreeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java
@@ -16,26 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.common;
+package org.apache.iotdb.db.mpp.execution;
+
+import org.apache.iotdb.db.mpp.common.FragmentId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
 
 import java.util.List;
 
-/**
- * @author A simple class to describe the tree style structure of query executable operators
- * @param <T>
- */
-public class TreeNode<T extends TreeNode<T>> {
-  protected List<T> children;
+public class FragmentInfo {
 
-  public T getChild(int i) {
-    return hasChild(i) ? children.get(i) : null;
-  }
+  private final FragmentId stageId;
+  private final FragmentState state;
+  private final PlanFragment plan;
 
-  public boolean hasChild(int i) {
-    return children.size() > i;
-  }
+  private final List<FragmentInfo> childrenFragments;
 
-  public void addChild(T n) {
-    children.add(n);
+  public FragmentInfo(
+      FragmentId stageId,
+      FragmentState state,
+      PlanFragment plan,
+      List<FragmentInfo> childrenFragments) {
+    this.stageId = stageId;
+    this.state = state;
+    this.plan = plan;
+    this.childrenFragments = childrenFragments;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
new file mode 100644
index 0000000..eb00953
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class FragmentInstanceContext {
+
+  private FragmentInstanceId id;
+
+  // TODO if we split one fragment instance into multiple pipelines to run, we need to replace it
+  // with CopyOnWriteArrayList or some other thread safe data structure
+  private final List<OperatorContext> operatorContexts = new ArrayList<>();
+
+  private final long createNanos = System.nanoTime();
+
+  //    private final GcMonitor gcMonitor;
+  //    private final AtomicLong startNanos = new AtomicLong();
+  //    private final AtomicLong startFullGcCount = new AtomicLong(-1);
+  //    private final AtomicLong startFullGcTimeNanos = new AtomicLong(-1);
+  //    private final AtomicLong endNanos = new AtomicLong();
+  //    private final AtomicLong endFullGcCount = new AtomicLong(-1);
+  //    private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
+
+  public FragmentInstanceContext(FragmentInstanceId id) {
+    this.id = id;
+  }
+
+  public OperatorContext addOperatorContext(
+      int operatorId, PlanNodeId planNodeId, String operatorType) {
+    checkArgument(operatorId >= 0, "operatorId is negative");
+
+    for (OperatorContext operatorContext : operatorContexts) {
+      checkArgument(
+          operatorId != operatorContext.getOperatorId(),
+          "A context already exists for operatorId %s",
+          operatorId);
+    }
+
+    OperatorContext operatorContext = new OperatorContext(operatorId, planNodeId, operatorType);
+    operatorContexts.add(operatorContext);
+    return operatorContext;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java
new file mode 100644
index 0000000..a98210b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
+
+public enum FragmentInstanceState {
+  /**
+   * Instance is planned but has not been scheduled yet. An instance will be in the planned state
+   * until, the dependencies of the instance have begun producing output.
+   */
+  PLANNED(false),
+  /** Instance is running. */
+  RUNNING(false),
+  /**
+   * Instance has finished executing and output is left to be consumed. In this state, there will be
+   * no new drivers, the existing drivers have finished and the output buffer of the instance is
+   * at-least in a 'no-more-tsBlocks' state.
+   */
+  FLUSHING(false),
+  /** Instance has finished executing and all output has been consumed. */
+  FINISHED(true),
+  /** Instance was canceled by a user. */
+  CANCELED(true),
+  /** Instance was aborted due to a failure in the query. The failure was not in this instance. */
+  ABORTED(true),
+  /** Instance execution failed. */
+  FAILED(true);
+
+  public static final Set<FragmentInstanceState> TERMINAL_INSTANCE_STATES =
+      Stream.of(FragmentInstanceState.values())
+          .filter(FragmentInstanceState::isDone)
+          .collect(toImmutableSet());
+
+  /**
+   * If doneState is true, indicating that it won't transfer to another state anymore, i.e. a
+   * terminal state.
+   */
+  private final boolean doneState;
+
+  FragmentInstanceState(boolean doneState) {
+    this.doneState = doneState;
+  }
+
+  /** Is this a terminal state. */
+  public boolean isDone() {
+    return doneState;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentState.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentState.java
new file mode 100644
index 0000000..8b596c5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentState.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution;
+
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
+
+public enum FragmentState {
+
+  /**
+   * Fragment is planned but has not been scheduled yet. A Fragment will be in the planned state
+   * until, the dependencies of the Fragment have begun producing output.
+   */
+  PLANNED(false, false),
+  /** Fragment instances are being scheduled on nodes. */
+  SCHEDULING(false, false),
+  /** Fragment is running. */
+  RUNNING(false, false),
+  /**
+   * Fragment has finished executing existing tasks but more instances could be scheduled in the
+   * future.
+   */
+  PENDING(false, false),
+  /** Fragment has finished executing and all output has been consumed. */
+  FINISHED(true, false),
+  /** Fragment was aborted due to a failure in the query. The failure was not in this Fragment. */
+  ABORTED(true, true),
+  /** Fragment execution failed. */
+  FAILED(true, true);
+
+  public static final Set<FragmentState> TERMINAL_FRAGMENT_STATES =
+      Stream.of(FragmentState.values()).filter(FragmentState::isDone).collect(toImmutableSet());
+
+  private final boolean doneState;
+  private final boolean failureState;
+
+  FragmentState(boolean doneState, boolean failureState) {
+    checkArgument(!failureState || doneState, "%s is a non-done failure state", name());
+    this.doneState = doneState;
+    this.failureState = failureState;
+  }
+
+  /** Is this a terminal state. */
+  public boolean isDone() {
+    return doneState;
+  }
+
+  /** Is this a non-success terminal state. */
+  public boolean isFailure() {
+    return failureState;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 2b9d8cf..62a2c65 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -20,8 +20,10 @@ package org.apache.iotdb.db.mpp.execution;
 
 import org.apache.iotdb.db.mpp.common.Analysis;
 import org.apache.iotdb.db.mpp.common.QueryContext;
-import org.apache.iotdb.db.mpp.plan.*;
-import org.apache.iotdb.db.mpp.plan.optimzation.PlanOptimizer;
+import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
+import org.apache.iotdb.db.mpp.execution.scheduler.IScheduler;
+import org.apache.iotdb.db.mpp.sql.planner.optimization.PlanOptimizer;
+import org.apache.iotdb.db.mpp.sql.planner.plan.*;
 
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -34,7 +36,7 @@ import java.util.List;
  */
 public class QueryExecution {
   private QueryContext context;
-  private QueryScheduler scheduler;
+  private IScheduler scheduler;
   private QueryStateMachine stateMachine;
 
   private List<PlanOptimizer> planOptimizers;
@@ -57,7 +59,7 @@ public class QueryExecution {
   }
 
   public void schedule() {
-    this.scheduler = new QueryScheduler(this.stateMachine, this.fragmentInstances);
+    this.scheduler = new ClusterScheduler(this.stateMachine, this.fragmentInstances);
     this.scheduler.start();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
similarity index 65%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryScheduler.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index fc5df30..4af8b31 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -16,9 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution;
+package org.apache.iotdb.db.mpp.execution.scheduler;
 
-import org.apache.iotdb.db.mpp.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.common.FragmentId;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.execution.FragmentInfo;
+import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+
+import io.airlift.units.Duration;
 
 import java.util.List;
 
@@ -29,20 +35,40 @@ import java.util.List;
  * <p>Later, we can add more control logic for a QueryExecution such as retry, kill and so on by
  * this scheduler.
  */
-public class QueryScheduler {
+public class ClusterScheduler implements IScheduler {
   // The stateMachine of the QueryExecution owned by this QueryScheduler
   private QueryStateMachine stateMachine;
 
   // The fragment instances which should be sent to corresponding Nodes.
   private List<FragmentInstance> instances;
 
-  public QueryScheduler(QueryStateMachine stateMachine, List<FragmentInstance> instances) {
+  public ClusterScheduler(QueryStateMachine stateMachine, List<FragmentInstance> instances) {
     this.stateMachine = stateMachine;
     this.instances = instances;
   }
 
+  @Override
   public void start() {}
 
+  @Override
+  public void abort() {}
+
+  @Override
+  public Duration getTotalCpuTime() {
+    return null;
+  }
+
+  @Override
+  public FragmentInfo getFragmentInfo() {
+    return null;
+  }
+
+  @Override
+  public void failFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {}
+
+  @Override
+  public void cancelFragment(FragmentId fragmentId) {}
+
   // Send the instances to other nodes
   private void sendFragmentInstances() {}
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java
similarity index 61%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java
index a4cb88c..16145fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java
@@ -16,23 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.execution.scheduler;
 
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
-import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
+import org.apache.iotdb.db.mpp.common.FragmentId;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.execution.FragmentInfo;
 
-/** The FilterNode is responsible to filter the RowRecord from TsBlock. */
-public class FilterNode extends ProcessNode {
+import io.airlift.units.Duration;
 
-  // The filter
-  private FilterOperator rowFilter;
+public interface IScheduler {
 
-  public FilterNode(PlanNodeId id) {
-    super(id);
-  }
+  void start();
 
-  public FilterNode(PlanNodeId id, FilterOperator rowFilter) {
-    this(id);
-    this.rowFilter = rowFilter;
-  }
+  void abort();
+
+  Duration getTotalCpuTime();
+
+  FragmentInfo getFragmentInfo();
+
+  void failFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause);
+
+  void cancelFragment(FragmentId fragmentId);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
new file mode 100644
index 0000000..10de6b5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.scheduler;
+
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.metadata.SchemaEngine;
+import org.apache.iotdb.db.mpp.common.FragmentId;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.execution.FragmentInfo;
+
+import io.airlift.units.Duration;
+
+public class StandaloneScheduler implements IScheduler {
+
+  private static final StorageEngine STORAGE_ENGINE = StorageEngine.getInstance();
+
+  private static final SchemaEngine SCHEMA_ENGINE = SchemaEngine.getInstance();
+
+  @Override
+  public void start() {}
+
+  @Override
+  public void abort() {}
+
+  @Override
+  public Duration getTotalCpuTime() {
+    return null;
+  }
+
+  @Override
+  public FragmentInfo getFragmentInfo() {
+    return null;
+  }
+
+  @Override
+  public void failFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {}
+
+  @Override
+  public void cancelFragment(FragmentId fragmentId) {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/OperatorContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/OperatorContext.java
index c635f74..feedc13 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/OperatorContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/OperatorContext.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.operator;
 
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 
 /**
  * Contains information about {@link Operator} execution.
@@ -36,4 +36,8 @@ public class OperatorContext {
     this.planNodeId = planNodeId;
     this.operatorType = operatorType;
   }
+
+  public int getOperatorId() {
+    return operatorId;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
similarity index 57%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
index 63e07b4..6366b29 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
@@ -16,14 +16,37 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.operator.process;
 
 import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
 
-public class ProcessNode extends PlanNode<TsBlock> {
-  public ProcessNode(PlanNodeId id) {
-    super(id);
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class AggregateOperator implements ProcessOperator {
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return null;
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    return ProcessOperator.super.isBlocked();
+  }
+
+  @Override
+  public TsBlock next() {
+    return null;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public void close() throws Exception {
+    ProcessOperator.super.close();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
similarity index 57%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
index 63e07b4..408296a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
@@ -16,14 +16,36 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.operator.process;
 
 import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
 
-public class ProcessNode extends PlanNode<TsBlock> {
-  public ProcessNode(PlanNodeId id) {
-    super(id);
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class DeviceMergeOperator implements ProcessOperator {
+  @Override
+  public OperatorContext getOperatorContext() {
+    return null;
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    return ProcessOperator.super.isBlocked();
+  }
+
+  @Override
+  public TsBlock next() {
+    return null;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public void close() throws Exception {
+    ProcessOperator.super.close();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java
similarity index 58%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java
index 63e07b4..66dc3ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FillOperator.java
@@ -16,14 +16,36 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.operator.process;
 
 import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
 
-public class ProcessNode extends PlanNode<TsBlock> {
-  public ProcessNode(PlanNodeId id) {
-    super(id);
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class FillOperator implements ProcessOperator {
+  @Override
+  public OperatorContext getOperatorContext() {
+    return null;
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    return ProcessOperator.super.isBlocked();
+  }
+
+  @Override
+  public TsBlock next() {
+    return null;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public void close() throws Exception {
+    ProcessOperator.super.close();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java
similarity index 57%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java
index 63e07b4..8d15250 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/FilterNullOperator.java
@@ -16,14 +16,37 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.operator.process;
 
 import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
 
-public class ProcessNode extends PlanNode<TsBlock> {
-  public ProcessNode(PlanNodeId id) {
-    super(id);
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class FilterNullOperator implements ProcessOperator {
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return null;
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    return ProcessOperator.super.isBlocked();
+  }
+
+  @Override
+  public TsBlock next() {
+    return null;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public void close() throws Exception {
+    ProcessOperator.super.close();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
similarity index 57%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
index 63e07b4..10e9daa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
@@ -16,14 +16,37 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.operator.process;
 
 import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
 
-public class ProcessNode extends PlanNode<TsBlock> {
-  public ProcessNode(PlanNodeId id) {
-    super(id);
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class GroupByLevelOperator implements ProcessOperator {
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return null;
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    return ProcessOperator.super.isBlocked();
+  }
+
+  @Override
+  public TsBlock next() {
+    return null;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public void close() throws Exception {
+    ProcessOperator.super.close();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
new file mode 100644
index 0000000..1ebca2c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.operator.process;
+
+import org.apache.iotdb.db.mpp.common.TsBlock;
+import org.apache.iotdb.db.mpp.operator.Operator;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+public class LimitOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+  private long remainingLimit;
+  private final Operator child;
+
+  public LimitOperator(OperatorContext operatorContext, long limit, Operator child) {
+    this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
+    checkArgument(limit >= 0, "limit must be at least zero");
+    this.remainingLimit = limit;
+    this.child = requireNonNull(child, "child operator is null");
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    return child.isBlocked();
+  }
+
+  @Override
+  public TsBlock next() {
+    TsBlock block = child.next();
+    TsBlock res = block;
+    if (block.getCount() <= remainingLimit) {
+      remainingLimit -= block.getCount();
+    } else {
+      res = block.getRegion(0, (int) remainingLimit);
+      remainingLimit = 0;
+    }
+    return res;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return child.hasNext();
+  }
+
+  @Override
+  public void close() throws Exception {
+    ProcessOperator.super.close();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java
similarity index 57%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java
index 63e07b4..25b4bc9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/OffsetOperator.java
@@ -16,14 +16,37 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.operator.process;
 
 import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
 
-public class ProcessNode extends PlanNode<TsBlock> {
-  public ProcessNode(PlanNodeId id) {
-    super(id);
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class OffsetOperator implements ProcessOperator {
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return null;
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    return ProcessOperator.super.isBlocked();
+  }
+
+  @Override
+  public TsBlock next() {
+    return null;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public void close() throws Exception {
+    ProcessOperator.super.close();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/ProcessOperator.java
similarity index 78%
copy from server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/ProcessOperator.java
index dd8d436..aeb9535 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/ProcessOperator.java
@@ -16,16 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.common;
+package org.apache.iotdb.db.mpp.operator.process;
 
-public class QueryId {
-  private String Id;
+import org.apache.iotdb.db.mpp.operator.Operator;
 
-  public String getId() {
-    return Id;
-  }
-
-  public void setId(String id) {
-    Id = id;
-  }
-}
+// TODO should think about what interfaces should this ProcessOperator have
+public interface ProcessOperator extends Operator {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java
similarity index 58%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java
index 63e07b4..0199d52 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/SortOperator.java
@@ -16,14 +16,37 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.operator.process;
 
 import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
 
-public class ProcessNode extends PlanNode<TsBlock> {
-  public ProcessNode(PlanNodeId id) {
-    super(id);
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class SortOperator implements ProcessOperator {
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return null;
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    return ProcessOperator.super.isBlocked();
+  }
+
+  @Override
+  public TsBlock next() {
+    return null;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public void close() throws Exception {
+    ProcessOperator.super.close();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
similarity index 57%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
index 63e07b4..11cee59 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
@@ -16,14 +16,37 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.operator.process;
 
 import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
 
-public class ProcessNode extends PlanNode<TsBlock> {
-  public ProcessNode(PlanNodeId id) {
-    super(id);
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class TimeJoinOperator implements ProcessOperator {
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return null;
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    return ProcessOperator.super.isBlocked();
+  }
+
+  @Override
+  public TsBlock next() {
+    return null;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public void close() throws Exception {
+    ProcessOperator.super.close();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/FragmentSinkOperator.java
similarity index 54%
copy from server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/FragmentSinkOperator.java
index aa40205..aca892f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/FragmentSinkOperator.java
@@ -16,33 +16,46 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.common;
+package org.apache.iotdb.db.mpp.operator.sink;
 
-import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.db.mpp.common.TsBlock;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
 
-/**
- * Intermediate result for most of ExecOperators. The Tablet contains data from one or more columns
- * and constructs them as a row based view The columns can be series, aggregation result for one
- * series or scalar value (such as deviceName). The Tablet also contains the metadata to describe
- * the columns.
- *
- * <p>TODO: consider the detailed data store model in memory. (using column based or row based ?)
- */
-public class TsBlock {
+import com.google.common.util.concurrent.ListenableFuture;
 
-  // Describe the column info
-  private TsBlockMetadata metadata;
+public class FragmentSinkOperator implements SinkOperator {
 
-  public boolean hasNext() {
-    return false;
+  @Override
+  public OperatorContext getOperatorContext() {
+    return null;
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    return SinkOperator.super.isBlocked();
   }
 
-  // Get next row in current tablet
-  public RowRecord getNext() {
+  @Override
+  public TsBlock next() {
     return null;
   }
 
-  public TsBlockMetadata getMetadata() {
-    return metadata;
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public void close() throws Exception {
+    SinkOperator.super.close();
   }
+
+  @Override
+  public void send(TsBlock tsBlock) {}
+
+  @Override
+  public void setNoMoreTsBlocks() {}
+
+  @Override
+  public void abort() {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/SinkOperator.java
similarity index 54%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/SinkOperator.java
index 9954c74..c3f16ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/sink/SinkOperator.java
@@ -16,19 +16,28 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.operator.sink;
 
-import org.apache.iotdb.db.mpp.common.QueryContext;
 import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.operator.Operator;
 
-import java.util.List;
+public interface SinkOperator extends Operator {
 
-public class DistributedQueryPlan {
-  private QueryContext context;
-  private PlanNode<TsBlock> rootNode;
-  private PlanFragment rootFragment;
+  /**
+   * Sends a tsBlock to an unpartitioned buffer. If no-more-tsBlocks has been set, the send tsBlock
+   * call is ignored. This can happen with limit queries.
+   */
+  void send(TsBlock tsBlock);
 
-  // TODO: consider whether this field is necessary when do the implementation
-  private List<PlanFragment> fragments;
+  /**
+   * Notify SinkHandle that no more tsBlocks will be sent. Any future calls to send a tsBlock are
+   * ignored.
+   */
+  void setNoMoreTsBlocks();
+
+  /**
+   * Abort the sink handle, discarding all tsBlocks which may still in memory buffer, but blocking
+   * readers. It is expected that readers will be unblocked when the failed query is cleaned up.
+   */
+  void abort();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
similarity index 54%
copy from server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
index aa40205..fcd4605 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
@@ -16,33 +16,42 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.common;
+package org.apache.iotdb.db.mpp.operator.source;
 
-import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.db.mpp.common.TsBlock;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 
-/**
- * Intermediate result for most of ExecOperators. The Tablet contains data from one or more columns
- * and constructs them as a row based view The columns can be series, aggregation result for one
- * series or scalar value (such as deviceName). The Tablet also contains the metadata to describe
- * the columns.
- *
- * <p>TODO: consider the detailed data store model in memory. (using column based or row based ?)
- */
-public class TsBlock {
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class SeriesAggregateScanOperator implements SourceOperator {
+  @Override
+  public OperatorContext getOperatorContext() {
+    return null;
+  }
 
-  // Describe the column info
-  private TsBlockMetadata metadata;
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    return SourceOperator.super.isBlocked();
+  }
 
+  @Override
+  public TsBlock next() {
+    return null;
+  }
+
+  @Override
   public boolean hasNext() {
     return false;
   }
 
-  // Get next row in current tablet
-  public RowRecord getNext() {
-    return null;
+  @Override
+  public void close() throws Exception {
+    SourceOperator.super.close();
   }
 
-  public TsBlockMetadata getMetadata() {
-    return metadata;
+  @Override
+  public PlanNodeId getSourceId() {
+    return null;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
similarity index 54%
copy from server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
index aa40205..c0bc9ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanOperator.java
@@ -16,33 +16,43 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.common;
+package org.apache.iotdb.db.mpp.operator.source;
 
-import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.db.mpp.common.TsBlock;
+import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 
-/**
- * Intermediate result for most of ExecOperators. The Tablet contains data from one or more columns
- * and constructs them as a row based view The columns can be series, aggregation result for one
- * series or scalar value (such as deviceName). The Tablet also contains the metadata to describe
- * the columns.
- *
- * <p>TODO: consider the detailed data store model in memory. (using column based or row based ?)
- */
-public class TsBlock {
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class SeriesScanOperator implements SourceOperator {
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return null;
+  }
 
-  // Describe the column info
-  private TsBlockMetadata metadata;
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    return SourceOperator.super.isBlocked();
+  }
+
+  @Override
+  public TsBlock next() {
+    return null;
+  }
 
+  @Override
   public boolean hasNext() {
     return false;
   }
 
-  // Get next row in current tablet
-  public RowRecord getNext() {
-    return null;
+  @Override
+  public void close() throws Exception {
+    SourceOperator.super.close();
   }
 
-  public TsBlockMetadata getMetadata() {
-    return metadata;
+  @Override
+  public PlanNodeId getSourceId() {
+    return null;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SourceOperator.java
similarity index 77%
copy from server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SourceOperator.java
index dd8d436..8454fd6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/QueryId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SourceOperator.java
@@ -16,16 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.common;
+package org.apache.iotdb.db.mpp.operator.source;
 
-public class QueryId {
-  private String Id;
+import org.apache.iotdb.db.mpp.operator.Operator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 
-  public String getId() {
-    return Id;
-  }
+public interface SourceOperator extends Operator {
 
-  public void setId(String id) {
-    Id = id;
-  }
+  PlanNodeId getSourceId();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstanceId.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstanceId.java
deleted file mode 100644
index 18181cd..0000000
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstanceId.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.mpp.plan;
-
-public class FragmentInstanceId {
-  private String id;
-
-  public FragmentInstanceId(String id) {
-    this.id = id;
-  }
-
-  // A SinkOperator is needed here. So that we can know where the result of this instance can be
-  // sent
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNullNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNullNode.java
deleted file mode 100644
index d7c463d..0000000
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNullNode.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.mpp.plan.node.process;
-
-import org.apache.iotdb.db.mpp.common.WithoutPolicy;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
-
-/** WithoutNode is used to discard specific rows from upstream node. */
-public class FilterNullNode extends ProcessNode {
-
-  // The policy to discard the result from upstream operator
-  private WithoutPolicy discardPolicy;
-
-  public FilterNullNode(PlanNodeId id) {
-    super(id);
-  }
-
-  public FilterNullNode(PlanNodeId id, WithoutPolicy discardPolicy) {
-    this(id);
-    this.discardPolicy = discardPolicy;
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
new file mode 100644
index 0000000..278a80c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.planner;
+
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.operator.Operator;
+import org.apache.iotdb.db.mpp.operator.process.LimitOperator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.*;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
+
+import java.util.List;
+
+/**
+ * used to plan a fragment instance. Currently, we simply change it from PlanNode to executable
+ * Operator tree, but in the future, we may split one fragment instance into multiple pipeline to
+ * run a fragment instance parallel and take full advantage of multi-cores
+ */
+public class LocalExecutionPlanner {
+
+  /** This Visitor is responsible for transferring PlanNode Tree to Operator Tree */
+  private class Visitor extends PlanVisitor<Operator, LocalExecutionPlanContext> {
+
+    @Override
+    public Operator visitPlan(PlanNode node, LocalExecutionPlanContext context) {
+      throw new UnsupportedOperationException("should call the concrete visitXX() method");
+    }
+
+    @Override
+    public Operator visitSeriesScan(SeriesScanNode node, LocalExecutionPlanContext context) {
+      return super.visitSeriesScan(node, context);
+    }
+
+    @Override
+    public Operator visitSeriesAggregate(
+        SeriesAggregateScanNode node, LocalExecutionPlanContext context) {
+      return super.visitSeriesAggregate(node, context);
+    }
+
+    @Override
+    public Operator visitDeviceMerge(DeviceMergeNode node, LocalExecutionPlanContext context) {
+      return super.visitDeviceMerge(node, context);
+    }
+
+    @Override
+    public Operator visitFill(FillNode node, LocalExecutionPlanContext context) {
+      return super.visitFill(node, context);
+    }
+
+    @Override
+    public Operator visitFilter(FilterNode node, LocalExecutionPlanContext context) {
+      PlanNode child = node.getChild();
+
+      FilterOperator filterExpression = node.getPredicate();
+      List<String> outputSymbols = node.getOutputColumnNames();
+      return super.visitFilter(node, context);
+    }
+
+    @Override
+    public Operator visitFilterNull(FilterNullNode node, LocalExecutionPlanContext context) {
+      return super.visitFilterNull(node, context);
+    }
+
+    @Override
+    public Operator visitGroupByLevel(GroupByLevelNode node, LocalExecutionPlanContext context) {
+      return super.visitGroupByLevel(node, context);
+    }
+
+    @Override
+    public Operator visitLimit(LimitNode node, LocalExecutionPlanContext context) {
+      Operator child = node.getChild().accept(this, context);
+      return new LimitOperator(
+          context.taskContext.addOperatorContext(
+              context.getNextOperatorId(), node.getId(), LimitOperator.class.getSimpleName()),
+          node.getLimit(),
+          child);
+    }
+
+    @Override
+    public Operator visitOffset(OffsetNode node, LocalExecutionPlanContext context) {
+      return super.visitOffset(node, context);
+    }
+
+    @Override
+    public Operator visitRowBasedSeriesAggregate(
+        AggregateNode node, LocalExecutionPlanContext context) {
+      return super.visitRowBasedSeriesAggregate(node, context);
+    }
+
+    @Override
+    public Operator visitSort(SortNode node, LocalExecutionPlanContext context) {
+      return super.visitSort(node, context);
+    }
+
+    @Override
+    public Operator visitTimeJoin(TimeJoinNode node, LocalExecutionPlanContext context) {
+      return super.visitTimeJoin(node, context);
+    }
+  }
+
+  private static class LocalExecutionPlanContext {
+    private final FragmentInstanceContext taskContext;
+    private int nextOperatorId = 0;
+
+    public LocalExecutionPlanContext(FragmentInstanceContext taskContext) {
+      this.taskContext = taskContext;
+    }
+
+    private int getNextOperatorId() {
+      return nextOperatorId++;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/optimzation/PlanOptimizer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/optimization/PlanOptimizer.java
similarity index 80%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/optimzation/PlanOptimizer.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/optimization/PlanOptimizer.java
index b99c5db..b091b91 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/optimzation/PlanOptimizer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/optimization/PlanOptimizer.java
@@ -16,12 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.optimzation;
+package org.apache.iotdb.db.mpp.sql.planner.optimization;
 
 import org.apache.iotdb.db.mpp.common.QueryContext;
-import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 
 public interface PlanOptimizer {
-  PlanNode<TsBlock> optimize(PlanNode<TsBlock> plan, QueryContext context);
+  PlanNode optimize(PlanNode plan, QueryContext context);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
similarity index 86%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
index 9954c74..50835dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
@@ -16,17 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
 
 import org.apache.iotdb.db.mpp.common.QueryContext;
-import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 
 import java.util.List;
 
 public class DistributedQueryPlan {
   private QueryContext context;
-  private PlanNode<TsBlock> rootNode;
+  private PlanNode rootNode;
   private PlanFragment rootFragment;
 
   // TODO: consider whether this field is necessary when do the implementation
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributionPlanner.java
similarity index 95%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributionPlanner.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributionPlanner.java
index 03eb1dc..fc428cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributionPlanner.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
 
 import org.apache.iotdb.db.mpp.common.Analysis;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstance.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index 0b405b3..7674f4f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -16,7 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
+
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 
 public class FragmentInstance {
   private FragmentInstanceId id;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalPlanner.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalPlanner.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalPlanner.java
index a74644b..0a45b6f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalPlanner.java
@@ -16,11 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
 
 import org.apache.iotdb.db.mpp.common.Analysis;
 import org.apache.iotdb.db.mpp.common.QueryContext;
-import org.apache.iotdb.db.mpp.plan.optimzation.PlanOptimizer;
+import org.apache.iotdb.db.mpp.sql.planner.optimization.PlanOptimizer;
 
 import java.util.List;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalQueryPlan.java
similarity index 85%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalQueryPlan.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalQueryPlan.java
index 5094df6..666fbf3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/LogicalQueryPlan.java
@@ -16,11 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
 
 import org.apache.iotdb.db.mpp.common.QueryContext;
-import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 
 /**
  * LogicalQueryPlan represents a logical query plan. It stores the root node of corresponding query
@@ -28,5 +27,5 @@ import org.apache.iotdb.db.mpp.plan.node.PlanNode;
  */
 public class LogicalQueryPlan {
   private QueryContext context;
-  private PlanNode<TsBlock> rootNode;
+  private PlanNode rootNode;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
similarity index 85%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragment.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
index fc49264..0aa3ac5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
@@ -16,14 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
 
-import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 
 // TODO: consider whether it is necessary to make PlanFragment as a TreeNode
 /** PlanFragment contains a sub-query of distributed query. */
 public class PlanFragment {
   private PlanFragmentId id;
-  private PlanNode<TsBlock> root;
+  private PlanNode root;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragmentId.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
index 39f8d17..38a56f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
 
 public class PlanFragmentId {
   private String id;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
similarity index 55%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
index 1a3f103..3a3975f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
@@ -16,19 +16,32 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node;
 
-import org.apache.iotdb.db.mpp.common.TreeNode;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/** The base class of query executable operators, which is used to compose logical query plan. */
+// TODO: consider how to restrict the children type for each type of ExecOperator
+public abstract class PlanNode {
 
-/**
- * @author xingtanzjr The base class of query executable operators, which is used to compose logical
- *     query plan. TODO: consider how to restrict the children type for each type of ExecOperator
- *     TODO: consider to fix the Template type as TsBlock
- */
-public abstract class PlanNode<T> extends TreeNode<PlanNode<T>> {
   private PlanNodeId id;
 
-  public PlanNode(PlanNodeId id) {
+  protected PlanNode(PlanNodeId id) {
+    requireNonNull(id, "id is null");
     this.id = id;
   }
+
+  public PlanNodeId getId() {
+    return id;
+  }
+
+  public abstract List<PlanNode> getChildren();
+
+  public abstract List<String> getOutputColumnNames();
+
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitPlan(this, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeId.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeId.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java
index e5a69e9..f829dfb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeId.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node;
 
 public class PlanNodeId {
   private String id;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeIdAllocator.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeIdAllocator.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeIdAllocator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeIdAllocator.java
index 6e70c20..919f303 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeIdAllocator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeIdAllocator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node;
 
 /** A centralized PlanNodeId generator */
 public class PlanNodeIdAllocator {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
new file mode 100644
index 0000000..0be251d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.planner.plan.node;
+
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.*;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+
+public abstract class PlanVisitor<R, C> {
+
+  public abstract R visitPlan(PlanNode node, C context);
+
+  public R visitSeriesScan(SeriesScanNode node, C context) {
+    return visitPlan(node, context);
+  }
+
+  public R visitSeriesAggregate(SeriesAggregateScanNode node, C context) {
+    return visitPlan(node, context);
+  }
+
+  public R visitDeviceMerge(DeviceMergeNode node, C context) {
+    return visitPlan(node, context);
+  }
+
+  public R visitFill(FillNode node, C context) {
+    return visitPlan(node, context);
+  }
+
+  public R visitFilter(FilterNode node, C context) {
+    return visitPlan(node, context);
+  }
+
+  public R visitFilterNull(FilterNullNode node, C context) {
+    return visitPlan(node, context);
+  }
+
+  public R visitGroupByLevel(GroupByLevelNode node, C context) {
+    return visitPlan(node, context);
+  }
+
+  public R visitLimit(LimitNode node, C context) {
+    return visitPlan(node, context);
+  }
+
+  public R visitOffset(OffsetNode node, C context) {
+    return visitPlan(node, context);
+  }
+
+  public R visitRowBasedSeriesAggregate(AggregateNode node, C context) {
+    return visitPlan(node, context);
+  }
+
+  public R visitSort(SortNode node, C context) {
+    return visitPlan(node, context);
+  }
+
+  public R visitTimeJoin(TimeJoinNode node, C context) {
+    return visitPlan(node, context);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/RowBasedSeriesAggregateNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
similarity index 53%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/RowBasedSeriesAggregateNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
index 9d7b943..9382fbf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/RowBasedSeriesAggregateNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
@@ -16,20 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.db.mpp.common.GroupByTimeParameter;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
 
 import java.util.List;
+import java.util.Map;
 
 /**
- * This node is used to aggregate required series by raw data. The raw data will be input as a
- * TsBlock. This node will output the series aggregated result represented by TsBlock Thus, the
- * columns in output TsBlock will be different from input TsBlock.
+ * This node is used to aggregate required series from multiple sources. The source data will be
+ * input as a TsBlock, it may be raw data or partial aggregation result. This node will output the
+ * final series aggregated result represented by TsBlock.
  */
-public class RowBasedSeriesAggregateNode extends ProcessNode {
+public class AggregateNode extends ProcessNode {
   // The parameter of `group by time`
   // Its value will be null if there is no `group by time` clause,
   private GroupByTimeParameter groupByTimeParameter;
@@ -38,22 +41,34 @@ public class RowBasedSeriesAggregateNode extends ProcessNode {
   // result TsBlock
   // (Currently we only support one series in the aggregation function)
   // TODO: need consider whether it is suitable the aggregation function using FunctionExpression
-  private List<FunctionExpression> aggregateFuncList;
+  private Map<String, FunctionExpression> aggregateFuncMap;
 
-  public RowBasedSeriesAggregateNode(PlanNodeId id) {
+  private final List<PlanNode> children;
+  private final List<String> columnNames;
+
+  public AggregateNode(
+      PlanNodeId id,
+      Map<String, FunctionExpression> aggregateFuncMap,
+      List<PlanNode> children,
+      List<String> columnNames) {
     super(id);
+    this.aggregateFuncMap = aggregateFuncMap;
+    this.children = children;
+    this.columnNames = columnNames;
   }
 
-  public RowBasedSeriesAggregateNode(PlanNodeId id, List<FunctionExpression> aggregateFuncList) {
-    this(id);
-    this.aggregateFuncList = aggregateFuncList;
+  @Override
+  public List<PlanNode> getChildren() {
+    return children;
   }
 
-  public RowBasedSeriesAggregateNode(
-      PlanNodeId id,
-      List<FunctionExpression> aggregateFuncList,
-      GroupByTimeParameter groupByTimeParameter) {
-    this(id, aggregateFuncList);
-    this.groupByTimeParameter = groupByTimeParameter;
+  @Override
+  public List<String> getOutputColumnNames() {
+    return columnNames;
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitRowBasedSeriesAggregate(this, context);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
similarity index 69%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/DeviceMergeNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
index 9269545..54a0cf8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
@@ -16,14 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
+import org.apache.iotdb.db.mpp.common.FilterNullPolicy;
 import org.apache.iotdb.db.mpp.common.OrderBy;
-import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.common.WithoutPolicy;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -43,23 +44,42 @@ public class DeviceMergeNode extends ProcessNode {
   // The without policy is able to be push down to the DeviceMergeNode because we can know whether a
   // row contains
   // null or not.
-  private WithoutPolicy withoutPolicy;
+  private FilterNullPolicy filterNullPolicy;
 
   // The map from deviceName to corresponding query result node responsible for that device.
   // DeviceNode means the node whose output TsBlock contains the data belonged to one device.
-  private Map<String, PlanNode<TsBlock>> childDeviceNodeMap;
+  private Map<String, PlanNode> childDeviceNodeMap;
+
+  private List<PlanNode> children;
+
+  private List<String> columnNames;
 
   public DeviceMergeNode(PlanNodeId id) {
     super(id);
   }
 
-  public DeviceMergeNode(PlanNodeId id, Map<String, PlanNode<TsBlock>> deviceNodeMap) {
+  @Override
+  public List<PlanNode> getChildren() {
+    return children;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return columnNames;
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitDeviceMerge(this, context);
+  }
+
+  public DeviceMergeNode(PlanNodeId id, Map<String, PlanNode> deviceNodeMap) {
     this(id);
     this.childDeviceNodeMap = deviceNodeMap;
     this.children.addAll(deviceNodeMap.values());
   }
 
-  public void addChildDeviceNode(String deviceName, PlanNode<TsBlock> childNode) {
+  public void addChildDeviceNode(String deviceName, PlanNode childNode) {
     this.childDeviceNodeMap.put(deviceName, childNode);
     this.children.add(childNode);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FillNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
similarity index 64%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FillNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
index 31e57cd..af88895 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FillNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
@@ -16,14 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.db.mpp.common.FillPolicy;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
 
 /** FillNode is used to fill the empty field in one row. */
 public class FillNode extends ProcessNode {
 
+  private PlanNode child;
+
   // The policy to discard the result from upstream node
   private FillPolicy fillPolicy;
 
@@ -31,6 +39,21 @@ public class FillNode extends ProcessNode {
     super(id);
   }
 
+  @Override
+  public List<PlanNode> getChildren() {
+    return ImmutableList.of(child);
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return child.getOutputColumnNames();
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitFill(this, context);
+  }
+
   public FillNode(PlanNodeId id, FillPolicy fillPolicy) {
     this(id);
     this.fillPolicy = fillPolicy;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
new file mode 100644
index 0000000..4504890
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
+
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+/** The FilterNode is responsible to filter the RowRecord from TsBlock. */
+public class FilterNode extends ProcessNode {
+
+  private final PlanNode child;
+  // TODO we need to rename it to something like expression in order to distinguish from Operator
+  // class
+  private final FilterOperator predicate;
+
+  public FilterNode(PlanNodeId id, PlanNode child, FilterOperator predicate) {
+    super(id);
+    this.child = child;
+    this.predicate = predicate;
+  }
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return ImmutableList.of(child);
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return child.getOutputColumnNames();
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitFilter(this, context);
+  }
+
+  public FilterOperator getPredicate() {
+    return predicate;
+  }
+
+  public PlanNode getChild() {
+    return child;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
new file mode 100644
index 0000000..fbf8cc9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
+
+import org.apache.iotdb.db.mpp.common.FilterNullPolicy;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+/** WithoutNode is used to discard specific rows from upstream node. */
+public class FilterNullNode extends ProcessNode {
+
+  // The policy to discard the result from upstream operator
+  private FilterNullPolicy discardPolicy;
+
+  private PlanNode child;
+
+  private List<String> filterNullColumnNames;
+
+  public FilterNullNode(PlanNodeId id, PlanNode child) {
+    super(id);
+    this.child = child;
+  }
+
+  public FilterNullNode(PlanNodeId id, PlanNode child, List<String> filterNullColumnNames) {
+    super(id);
+    this.child = child;
+    this.filterNullColumnNames = filterNullColumnNames;
+  }
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return ImmutableList.of(child);
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return child.getOutputColumnNames();
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitFilterNull(this, context);
+  }
+
+  public void setFilterNullColumnNames(List<String> filterNullColumnNames) {
+    this.filterNullColumnNames = filterNullColumnNames;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
similarity index 59%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/GroupByLevelNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
index 538d6d8..7dcca76 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
@@ -16,9 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+
+import java.util.List;
 
 /**
  * This node is responsible for the final aggregation merge operation. It will process the data from
@@ -32,10 +36,48 @@ import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
  */
 public class GroupByLevelNode extends ProcessNode {
 
+  private PlanNode child;
+
   private int[] groupByLevels;
 
-  public GroupByLevelNode(PlanNodeId id, int[] groupByLevels) {
+  private List<String> columnNames;
+
+  public GroupByLevelNode(
+      PlanNodeId id, PlanNode child, int[] groupByLevels, List<String> columnNames) {
     super(id);
+    this.child = child;
+    this.groupByLevels = groupByLevels;
+    this.columnNames = columnNames;
+  }
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return child.getChildren();
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return columnNames;
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitGroupByLevel(this, context);
+  }
+
+  public int[] getGroupByLevels() {
+    return groupByLevels;
+  }
+
+  public void setGroupByLevels(int[] groupByLevels) {
     this.groupByLevels = groupByLevels;
   }
+
+  public List<String> getColumnNames() {
+    return columnNames;
+  }
+
+  public void setColumnNames(List<String> columnNames) {
+    this.columnNames = columnNames;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
similarity index 53%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/LimitNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
index 9596c1a..8fdc9b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/LimitNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
@@ -16,22 +16,49 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
 
 /** LimitNode is used to select top n result. It uses the default order of upstream nodes */
 public class LimitNode extends ProcessNode {
 
   // The limit count
-  private int limit;
+  private final int limit;
+  private final PlanNode child;
 
-  public LimitNode(PlanNodeId id) {
+  public LimitNode(PlanNodeId id, int limit, PlanNode child) {
     super(id);
+    this.limit = limit;
+    this.child = child;
   }
 
-  public LimitNode(PlanNodeId id, int limit) {
-    this(id);
-    this.limit = limit;
+  @Override
+  public List<PlanNode> getChildren() {
+    return ImmutableList.of(child);
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return child.getOutputColumnNames();
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitLimit(this, context);
+  }
+
+  public int getLimit() {
+    return limit;
+  }
+
+  public PlanNode getChild() {
+    return child;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/OffsetNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
similarity index 56%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/OffsetNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
index 01e0e93..2e3fc78 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/OffsetNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
@@ -16,9 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+
+import java.util.List;
 
 /**
  * OffsetNode is used to skip top n result from upstream nodes. It uses the default order of
@@ -27,14 +31,35 @@ import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
 public class OffsetNode extends ProcessNode {
 
   // The limit count
-  private int offset;
+  private final PlanNode child;
+  private final int offset;
 
-  public OffsetNode(PlanNodeId id) {
+  public OffsetNode(PlanNodeId id, PlanNode child, int offset) {
     super(id);
+    this.child = child;
+    this.offset = offset;
   }
 
-  public OffsetNode(PlanNodeId id, int offset) {
-    this(id);
-    this.offset = offset;
+  @Override
+  public List<PlanNode> getChildren() {
+    return null;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitOffset(this, context);
+  }
+
+  public PlanNode getChild() {
+    return child;
+  }
+
+  public int getOffset() {
+    return offset;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ProcessNode.java
similarity index 77%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ProcessNode.java
index 63e07b4..9c1fec5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ProcessNode.java
@@ -16,13 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
-import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+
+public abstract class ProcessNode extends PlanNode {
 
-public class ProcessNode extends PlanNode<TsBlock> {
   public ProcessNode(PlanNodeId id) {
     super(id);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/SortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
similarity index 57%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/SortNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
index 1e83783..19464a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/SortNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
@@ -16,10 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.db.mpp.common.OrderBy;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
 
 /**
  * In general, the parameter in sortNode should be pushed down to the upstream operators. In our
@@ -27,14 +33,31 @@ import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
  */
 public class SortNode extends ProcessNode {
 
+  private final PlanNode child;
+
+  private final List<String> orderBy;
+
   private OrderBy sortOrder;
 
-  public SortNode(PlanNodeId id) {
+  public SortNode(PlanNodeId id, PlanNode child, List<String> orderBy, OrderBy sortOrder) {
     super(id);
+    this.child = child;
+    this.orderBy = orderBy;
+    this.sortOrder = sortOrder;
   }
 
-  public SortNode(PlanNodeId id, OrderBy sortOrder) {
-    this(id);
-    this.sortOrder = sortOrder;
+  @Override
+  public List<PlanNode> getChildren() {
+    return ImmutableList.of(child);
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return child.getOutputColumnNames();
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitSort(this, context);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
similarity index 55%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/TimeJoinNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
index ab48cc4..74fac14 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/TimeJoinNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
@@ -16,15 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.process;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
+import org.apache.iotdb.db.mpp.common.FilterNullPolicy;
 import org.apache.iotdb.db.mpp.common.OrderBy;
-import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.common.WithoutPolicy;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 
-import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * TimeJoinOperator is responsible for join two or more TsBlock. The join algorithm is like outer
@@ -41,27 +42,51 @@ public class TimeJoinNode extends ProcessNode {
   // The without policy is able to be push down to the TimeJoinOperator because we can know whether
   // a row contains
   // null or not.
-  private WithoutPolicy withoutPolicy;
+  private FilterNullPolicy filterNullPolicy;
 
-  public TimeJoinNode(PlanNodeId id) {
+  private List<PlanNode> children;
+
+  public TimeJoinNode(
+      PlanNodeId id,
+      OrderBy mergeOrder,
+      FilterNullPolicy filterNullPolicy,
+      List<PlanNode> children) {
     super(id);
-    this.mergeOrder = OrderBy.TIMESTAMP_ASC;
+    this.mergeOrder = mergeOrder;
+    this.filterNullPolicy = filterNullPolicy;
+    this.children = children;
   }
 
-  public TimeJoinNode(PlanNodeId id, PlanNode<TsBlock>... children) {
-    super(id);
-    this.children.addAll(Arrays.asList(children));
+  @Override
+  public List<PlanNode> getChildren() {
+    return children;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return children.stream()
+        .flatMap(child -> child.getOutputColumnNames().stream())
+        .collect(Collectors.toList());
   }
 
-  public void addChild(PlanNode<TsBlock> child) {
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitTimeJoin(this, context);
+  }
+
+  public void addChild(PlanNode child) {
     this.children.add(child);
   }
 
+  public void setChildren(List<PlanNode> children) {
+    this.children = children;
+  }
+
   public void setMergeOrder(OrderBy mergeOrder) {
     this.mergeOrder = mergeOrder;
   }
 
-  public void setWithoutPolicy(WithoutPolicy withoutPolicy) {
-    this.withoutPolicy = withoutPolicy;
+  public void setWithoutPolicy(FilterNullPolicy filterNullPolicy) {
+    this.filterNullPolicy = filterNullPolicy;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/CsvSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/CsvSinkNode.java
similarity index 73%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/CsvSinkNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/CsvSinkNode.java
index 0bac6bf..2fddbdd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/CsvSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/CsvSinkNode.java
@@ -16,9 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.sink;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
 
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+
+import java.util.List;
 
 public class CsvSinkNode extends SinkNode {
   public CsvSinkNode(PlanNodeId id) {
@@ -26,6 +29,16 @@ public class CsvSinkNode extends SinkNode {
   }
 
   @Override
+  public List<PlanNode> getChildren() {
+    return null;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
   public void close() throws Exception {}
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
similarity index 73%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/FragmentSinkNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index 58c3a71..da30223 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -16,9 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.sink;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
 
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+
+import java.util.List;
 
 public class FragmentSinkNode extends SinkNode {
   public FragmentSinkNode(PlanNodeId id) {
@@ -26,6 +29,16 @@ public class FragmentSinkNode extends SinkNode {
   }
 
   @Override
+  public List<PlanNode> getChildren() {
+    return null;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
   public void send() {}
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/SinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/SinkNode.java
similarity index 76%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/SinkNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/SinkNode.java
index f59effb..e01a2c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/SinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/SinkNode.java
@@ -16,13 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.sink;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
 
-import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 
-public abstract class SinkNode extends PlanNode<TsBlock> implements AutoCloseable {
+public abstract class SinkNode extends PlanNode implements AutoCloseable {
 
   public SinkNode(PlanNodeId id) {
     super(id);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/ThriftSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/ThriftSinkNode.java
similarity index 74%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/ThriftSinkNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/ThriftSinkNode.java
index f5c48df..bb343f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/ThriftSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/ThriftSinkNode.java
@@ -16,9 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.sink;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
 
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+
+import java.util.List;
 
 /** not implemented in current IoTDB yet */
 public class ThriftSinkNode extends SinkNode {
@@ -28,6 +31,16 @@ public class ThriftSinkNode extends SinkNode {
   }
 
   @Override
+  public List<PlanNode> getChildren() {
+    return null;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
   public void close() throws Exception {}
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/CsvSourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
similarity index 74%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/CsvSourceNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
index a2a0fde..612e930 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/CsvSourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
@@ -16,9 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.source;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
 
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+
+import java.util.List;
 
 /** Not implemented in current version. */
 public class CsvSourceNode extends SourceNode {
@@ -28,6 +31,16 @@ public class CsvSourceNode extends SourceNode {
   }
 
   @Override
+  public List<PlanNode> getChildren() {
+    return null;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
   public void close() throws Exception {}
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesAggregateNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
similarity index 75%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesAggregateNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
index 80ea58f..ee01ac1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesAggregateNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
@@ -16,13 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.source;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
 
 import org.apache.iotdb.db.mpp.common.GroupByTimeParameter;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
 /**
  * SeriesAggregateOperator is responsible to do the aggregation calculation for one series. It will
  * read the target series and calculate the aggregation result by the aggregation digest or raw data
@@ -38,7 +44,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
  * represent the whole aggregation result of this series. And the timestamp will be 0, which is
  * meaningless.
  */
-public class SeriesAggregateNode extends SourceNode {
+public class SeriesAggregateScanNode extends SourceNode {
 
   // The parameter of `group by time`
   // Its value will be null if there is no `group by time` clause,
@@ -51,16 +57,28 @@ public class SeriesAggregateNode extends SourceNode {
 
   private Filter filter;
 
-  public SeriesAggregateNode(PlanNodeId id) {
+  private String columnName;
+
+  public SeriesAggregateScanNode(PlanNodeId id) {
     super(id);
   }
 
-  public SeriesAggregateNode(PlanNodeId id, FunctionExpression aggregateFunc) {
+  @Override
+  public List<PlanNode> getChildren() {
+    return ImmutableList.of();
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return ImmutableList.of(columnName);
+  }
+
+  public SeriesAggregateScanNode(PlanNodeId id, FunctionExpression aggregateFunc) {
     this(id);
     this.aggregateFunc = aggregateFunc;
   }
 
-  public SeriesAggregateNode(
+  public SeriesAggregateScanNode(
       PlanNodeId id, FunctionExpression aggregateFunc, GroupByTimeParameter groupByTimeParameter) {
     this(id, aggregateFunc);
     this.groupByTimeParameter = groupByTimeParameter;
@@ -72,6 +90,11 @@ public class SeriesAggregateNode extends SourceNode {
   @Override
   public void close() throws Exception {}
 
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitSeriesAggregate(this, context);
+  }
+
   // This method is used when do the PredicatePushDown.
   // The filter is not put in the constructor because the filter is only clear in the predicate
   // push-down stage
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
similarity index 79%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesScanNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
index ccfae5c..8858074 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
@@ -16,13 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.source;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
 
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.OrderBy;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
 /**
  * SeriesScanOperator is responsible for read data a specific series. When reading data, the
  * SeriesScanOperator can read the raw data batch by batch. And also, it can leverage the filter and
@@ -52,6 +58,8 @@ public class SeriesScanNode extends SourceNode {
   // offset for result set. The default value is 0
   private int offset;
 
+  private String columnName;
+
   public SeriesScanNode(PlanNodeId id, PartialPath seriesPath) {
     super(id);
     this.seriesPath = seriesPath;
@@ -82,4 +90,19 @@ public class SeriesScanNode extends SourceNode {
   public void setOffset(int offset) {
     this.offset = offset;
   }
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return ImmutableList.of();
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return ImmutableList.of(columnName);
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitSeriesScan(this, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
similarity index 76%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SourceNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
index c83da97..551e9d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
@@ -16,13 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.node.source;
+package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
 
-import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 
-public abstract class SourceNode extends PlanNode<TsBlock> implements AutoCloseable {
+public abstract class SourceNode extends PlanNode implements AutoCloseable {
 
   public SourceNode(PlanNodeId id) {
     super(id);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/tree/Expression.java
similarity index 89%
rename from server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/tree/Expression.java
index 7a4107e..a75d326 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/WithoutPolicy.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/tree/Expression.java
@@ -16,9 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.common;
+package org.apache.iotdb.db.mpp.sql.tree;
 
-public enum WithoutPolicy {
-  CONTAINS_NULL,
-  ALL_NULL
-}
+public class Expression {}