You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/22 11:10:32 UTC

[iotdb] branch master updated: Refactor attributes in PlanNode (#5616)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d939abfc5e Refactor attributes in PlanNode (#5616)
d939abfc5e is described below

commit d939abfc5e082089f8b15811eb05b1f1a9625737
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Fri Apr 22 19:10:25 2022 +0800

    Refactor attributes in PlanNode (#5616)
---
 .../iotdb/db/mpp/common/schematree/SchemaTree.java |  15 +++
 .../operator/process/merge/SingleColumnMerger.java |   2 +-
 .../source/SeriesAggregateScanOperator.java        |   6 +-
 .../db/mpp/sql/planner/DistributionPlanner.java    |   3 -
 .../{plan => }/IFragmentParallelPlaner.java        |   4 +-
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  |   2 +-
 .../iotdb/db/mpp/sql/planner/LogicalPlanner.java   |   5 +-
 .../iotdb/db/mpp/sql/planner/QueryPlanBuilder.java |  10 +-
 .../{plan => }/SimpleFragmentParallelPlanner.java  |   5 +-
 .../{plan => }/WriteFragmentParallelPlanner.java   |   5 +-
 .../db/mpp/sql/planner/plan/node/PlanNode.java     |   8 ++
 .../node/metedata/read/DevicesSchemaScanNode.java  |  17 ++++
 .../plan/node/metedata/read/SchemaFetchNode.java   |  17 ++++
 .../plan/node/metedata/read/SchemaMergeNode.java   |  17 ++++
 .../metedata/read/TimeSeriesSchemaScanNode.java    |  17 ++++
 .../node/metedata/write/AlterTimeSeriesNode.java   |  17 ++++
 .../plan/node/metedata/write/AuthorNode.java       |  17 ++++
 .../write/CreateAlignedTimeSeriesNode.java         |  16 +++
 .../node/metedata/write/CreateTimeSeriesNode.java  |  16 +++
 .../planner/plan/node/process/AggregateNode.java   |  31 +++---
 .../planner/plan/node/process/DeviceMergeNode.java |  47 ++++-----
 .../planner/plan/node/process/ExchangeNode.java    |  17 ++++
 .../sql/planner/plan/node/process/FillNode.java    |   9 +-
 .../sql/planner/plan/node/process/FilterNode.java  |  10 +-
 .../planner/plan/node/process/FilterNullNode.java  |  81 +++++++--------
 .../plan/node/process/GroupByLevelNode.java        |  85 +++++++++++-----
 .../sql/planner/plan/node/process/LimitNode.java   |   9 +-
 .../sql/planner/plan/node/process/OffsetNode.java  |   9 +-
 .../sql/planner/plan/node/process/SortNode.java    |  11 +--
 .../planner/plan/node/process/TimeJoinNode.java    | 109 ++++++++++++---------
 .../planner/plan/node/sink/FragmentSinkNode.java   |  17 ++++
 .../plan/node/source/SeriesAggregateScanNode.java  |  37 ++++---
 .../planner/plan/node/source/SeriesScanNode.java   |  15 +--
 .../plan/node/write/InsertMultiTabletsNode.java    |  17 ++++
 .../sql/planner/plan/node/write/InsertRowNode.java |  16 +++
 .../planner/plan/node/write/InsertRowsNode.java    |  17 ++++
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |  17 ++++
 .../planner/plan/node/write/InsertTabletNode.java  |  16 +++
 .../Aggregation.java}                              |  21 ++--
 .../planner/plan/parameter/AggregationStep.java    |  73 ++++++++++++++
 .../plan/parameter/FilterNullParameter.java        |  88 +++++++++++++++++
 .../plan/parameter/GroupByTimeParameter.java}      |  81 ++++++++-------
 .../plan/{ => parameter}/InputLocation.java        |  35 ++++++-
 .../planner/plan/{ => parameter}/OutputColumn.java |  24 ++++-
 .../statement/component/GroupByTimeComponent.java  |  73 --------------
 .../query/expression/unary/FunctionExpression.java |   3 +
 .../query/expression/unary/TimeSeriesOperand.java  |   5 +-
 .../iotdb/db/mpp/execution/DataDriverTest.java     |   2 +-
 .../iotdb/db/mpp/operator/LimitOperatorTest.java   |   2 +-
 .../operator/SeriesAggregateScanOperatorTest.java  |  16 +--
 .../db/mpp/operator/SingleColumnMergerTest.java    |   2 +-
 .../db/mpp/operator/TimeJoinOperatorTest.java      |   2 +-
 .../db/mpp/sql/plan/FragmentInstanceSerdeTest.java |   9 +-
 .../node/process/DeviceMergeNodeSerdeTest.java     |   3 -
 .../plan/node/process/ExchangeNodeSerdeTest.java   |   3 -
 .../sql/plan/node/process/FillNodeSerdeTest.java   |   4 -
 .../sql/plan/node/process/FilterNodeSerdeTest.java |   4 -
 .../plan/node/process/FilterNullNodeSerdeTest.java |   9 +-
 .../node/process/GroupByLevelNodeSerdeTest.java    |   9 +-
 .../sql/plan/node/process/LimitNodeSerdeTest.java  |   9 +-
 .../sql/plan/node/process/OffsetNodeSerdeTest.java |  17 ++--
 .../sql/plan/node/process/SortNodeSerdeTest.java   |   9 +-
 .../plan/node/process/TimeJoinNodeSerdeTest.java   |  13 +--
 .../source/SeriesAggregateScanNodeSerdeTest.java   |   6 +-
 64 files changed, 879 insertions(+), 412 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
index 0913a495a1..d8a5586362 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/SchemaTree.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.db.mpp.common.schematree;
 
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.template.NoTemplateOnMNodeException;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -31,6 +33,7 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Deque;
 import java.util.List;
+import java.util.Set;
 
 import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT;
 import static org.apache.iotdb.db.mpp.common.schematree.SchemaNode.SCHEMA_ENTITY_NODE;
@@ -64,6 +67,18 @@ public class SchemaTree {
     return new Pair<>(visitor.getAllResult(), visitor.getNextOffset());
   }
 
+  /**
+   * Get all device paths matching the path pattern.
+   *
+   * @param pathPattern the pattern of the target devices.
+   * @return A HashSet instance which stores devices paths matching the given path pattern.
+   */
+  public Set<PartialPath> getMatchedDevices(PartialPath pathPattern, boolean isPrefixMatch)
+      throws MetadataException {
+    // TODO: @zyk
+    throw new NoTemplateOnMNodeException("");
+  }
+
   public DeviceSchemaInfo searchDeviceSchemaInfo(
       PartialPath devicePath, List<String> measurements) {
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/SingleColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/SingleColumnMerger.java
index e4e054a0ef..8e3c82cb9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/SingleColumnMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/SingleColumnMerger.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.operator.process.merge;
 
-import org.apache.iotdb.db.mpp.sql.planner.plan.InputLocation;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
index 79586e3c4e..b94fadccc1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.statement.component.GroupByTimeComponent;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.factory.AggregateResultFactory;
@@ -85,7 +85,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
       List<AggregationType> aggregateFuncList,
       Filter timeFilter,
       boolean ascending,
-      GroupByTimeComponent groupByTimeParameter) {
+      GroupByTimeParameter groupByTimeParameter) {
     this.sourceId = sourceId;
     this.operatorContext = context;
     this.ascending = ascending;
@@ -121,7 +121,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
    * Aggregation query has only one time window and the result set of it does not contain a
    * timestamp, so it doesn't matter what the time range returns.
    */
-  public ITimeRangeIterator initTimeRangeIterator(GroupByTimeComponent groupByTimeParameter) {
+  public ITimeRangeIterator initTimeRangeIterator(GroupByTimeParameter groupByTimeParameter) {
     if (groupByTimeParameter == null) {
       return new SingleTimeWindowIterator(0, Long.MAX_VALUE);
     } else {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index b8a42f462b..67c8c5f740 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -25,12 +25,9 @@ import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IFragmentParallelPlaner;
 import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
-import org.apache.iotdb.db.mpp.sql.planner.plan.SimpleFragmentParallelPlanner;
 import org.apache.iotdb.db.mpp.sql.planner.plan.SubPlan;
-import org.apache.iotdb.db.mpp.sql.planner.plan.WriteFragmentParallelPlanner;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IFragmentParallelPlaner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/IFragmentParallelPlaner.java
similarity index 92%
rename from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IFragmentParallelPlaner.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/IFragmentParallelPlaner.java
index 36be61fa3f..512634ff9b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IFragmentParallelPlaner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/IFragmentParallelPlaner.java
@@ -17,7 +17,9 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.db.mpp.sql.planner;
+
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 
 import java.util.List;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index 673efa4490..230ffac797 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -47,7 +47,6 @@ import org.apache.iotdb.db.mpp.operator.source.DataSourceOperator;
 import org.apache.iotdb.db.mpp.operator.source.ExchangeOperator;
 import org.apache.iotdb.db.mpp.operator.source.SeriesAggregateScanOperator;
 import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
-import org.apache.iotdb.db.mpp.sql.planner.plan.OutputColumn;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.DevicesSchemaScanNode;
@@ -68,6 +67,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.OutputColumn;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
index cbaa275805..7d6cbee1fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.optimization.PlanOptimizer;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSeriesNode;
@@ -86,9 +85,7 @@ public class LogicalPlanner {
         rootNode = optimizer.optimize(rootNode, context);
       }
 
-      analysis
-          .getRespDatasetHeader()
-          .setColumnToTsBlockIndexMap(((IOutputPlanNode) rootNode).getOutputColumnNames());
+      analysis.getRespDatasetHeader().setColumnToTsBlockIndexMap(rootNode.getOutputColumnNames());
     }
 
     return new LogicalQueryPlan(context, rootNode);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java
index db7497ea61..eae36dc667 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java
@@ -37,11 +37,11 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.FilterNullParameter;
 import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
 import org.apache.iotdb.db.mpp.sql.statement.component.GroupByLevelComponent;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
-import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -270,10 +270,10 @@ public class QueryPlanBuilder {
         new FilterNullNode(
             context.getQueryId().genPlanNodeId(),
             this.getRoot(),
-            filterNullComponent.getWithoutPolicyType(),
-            filterNullComponent.getWithoutNullColumns().stream()
-                .map(Expression::getExpressionString)
-                .collect(Collectors.toList()));
+            new FilterNullParameter(
+                filterNullComponent.getWithoutPolicyType(),
+                new ArrayList<>() // TODO: support filtering based on partial columns
+                ));
   }
 
   public void planLimit(int rowLimit) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/SimpleFragmentParallelPlanner.java
similarity index 96%
rename from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/SimpleFragmentParallelPlanner.java
index f3413df263..83cffa59cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/SimpleFragmentParallelPlanner.java
@@ -16,12 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.db.mpp.sql.planner;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
+import org.apache.iotdb.db.mpp.sql.planner.plan.SubPlan;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/WriteFragmentParallelPlanner.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/WriteFragmentParallelPlanner.java
index be34ae257e..6f28e87f21 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/WriteFragmentParallelPlanner.java
@@ -17,10 +17,13 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.db.mpp.sql.planner;
 
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
+import org.apache.iotdb.db.mpp.sql.planner.plan.SubPlan;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
index f019999920..6c3d522a35 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNode.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node;
 
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.apache.commons.lang.Validate;
@@ -77,6 +79,12 @@ public abstract class PlanNode {
 
   public abstract int allowedChildCount();
 
+  public abstract List<ColumnHeader> getOutputColumnHeaders();
+
+  public abstract List<String> getOutputColumnNames();
+
+  public abstract List<TSDataType> getOutputColumnTypes();
+
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitPlan(this, context);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesSchemaScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesSchemaScanNode.java
index 4afa2fad9b..a66513953a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesSchemaScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesSchemaScanNode.java
@@ -20,9 +20,11 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
 
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.ByteBuffer;
@@ -61,6 +63,21 @@ public class DevicesSchemaScanNode extends SchemaScanNode {
     return new DevicesSchemaScanNode(getPlanNodeId(), path, limit, offset, isPrefixPath, hasSgCol);
   }
 
+  @Override
+  public List<ColumnHeader> getOutputColumnHeaders() {
+    return null;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
+  public List<TSDataType> getOutputColumnTypes() {
+    return null;
+  }
+
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.DEVICES_SCHEMA_SCAN.serialize(byteBuffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java
index 54800d33a1..2afbfa4b95 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaFetchNode.java
@@ -19,11 +19,13 @@
 
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
 
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import com.google.common.collect.ImmutableList;
 
@@ -61,6 +63,21 @@ public class SchemaFetchNode extends SchemaScanNode {
     return 0;
   }
 
+  @Override
+  public List<ColumnHeader> getOutputColumnHeaders() {
+    return null;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
+  public List<TSDataType> getOutputColumnTypes() {
+    return null;
+  }
+
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.SCHEMA_FETCH.serialize(byteBuffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java
index 7e9357cb68..1914825994 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaMergeNode.java
@@ -18,11 +18,13 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
 
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ProcessNode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -64,6 +66,21 @@ public class SchemaMergeNode extends ProcessNode {
     return CHILD_COUNT_NO_LIMIT;
   }
 
+  @Override
+  public List<ColumnHeader> getOutputColumnHeaders() {
+    return null;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
+  public List<TSDataType> getOutputColumnTypes() {
+    return null;
+  }
+
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.SCHEMA_MERGE.serialize(byteBuffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java
index ab5337f8a8..c789cc64f3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java
@@ -20,9 +20,11 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read;
 
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.ByteBuffer;
@@ -121,6 +123,21 @@ public class TimeSeriesSchemaScanNode extends SchemaScanNode {
         getPlanNodeId(), path, key, value, limit, offset, orderByHeat, isContains, isPrefixPath);
   }
 
+  @Override
+  public List<ColumnHeader> getOutputColumnHeaders() {
+    return null;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
+  public List<TSDataType> getOutputColumnTypes() {
+    return null;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
index c38d0e1452..2df87c8b44 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
@@ -21,12 +21,14 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write;
 
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement.AlterType;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.ByteBuffer;
@@ -136,6 +138,21 @@ public class AlterTimeSeriesNode extends PlanNode {
     return NO_CHILD_ALLOWED;
   }
 
+  @Override
+  public List<ColumnHeader> getOutputColumnHeaders() {
+    return null;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
+  public List<TSDataType> getOutputColumnTypes() {
+    return null;
+  }
+
   @Override
   public void serialize(ByteBuffer byteBuffer) {
     byteBuffer.putShort((short) PlanNodeType.ALTER_TIME_SERIES.ordinal());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AuthorNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AuthorNode.java
index adcd0cfc06..1f49ea735f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AuthorNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AuthorNode.java
@@ -22,11 +22,13 @@ import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.auth.entity.PrivilegeType;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.ByteBuffer;
@@ -164,6 +166,21 @@ public class AuthorNode extends PlanNode {
     return 0;
   }
 
+  @Override
+  public List<ColumnHeader> getOutputColumnHeaders() {
+    return null;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
+  public List<TSDataType> getOutputColumnTypes() {
+    return null;
+  }
+
   @Override
   public void serialize(ByteBuffer byteBuffer) {
     byteBuffer.putShort((short) PlanNodeType.AUTHOR.ordinal());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
index b6d0a56143..d90e79f8c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write;
 
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
@@ -159,6 +160,21 @@ public class CreateAlignedTimeSeriesNode extends PlanNode {
     return NO_CHILD_ALLOWED;
   }
 
+  @Override
+  public List<ColumnHeader> getOutputColumnHeaders() {
+    return null;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
+  public List<TSDataType> getOutputColumnTypes() {
+    return null;
+  }
+
   @Override
   public <R, C> R accept(PlanVisitor<R, C> visitor, C schemaRegion) {
     return visitor.visitCreateAlignedTimeSeries(this, schemaRegion);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
index ddeefcef63..dcbea17148 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -170,6 +171,21 @@ public class CreateTimeSeriesNode extends WritePlanNode {
     return NO_CHILD_ALLOWED;
   }
 
+  @Override
+  public List<ColumnHeader> getOutputColumnHeaders() {
+    return null;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
+  public List<TSDataType> getOutputColumnTypes() {
+    return null;
+  }
+
   public static CreateTimeSeriesNode deserialize(ByteBuffer byteBuffer) {
     String id;
     PartialPath path = null;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
index d5d018f0ee..ba71b30970 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/AggregateNode.java
@@ -21,12 +21,12 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.sql.statement.component.GroupByTimeComponent;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.Aggregation;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -52,19 +52,24 @@ import java.util.stream.Collectors;
  * input as a TsBlock, it may be raw data or partial aggregation result. This node will output the
  * final series aggregated result represented by TsBlock.
  */
-public class AggregateNode extends ProcessNode implements IOutputPlanNode {
+public class AggregateNode extends ProcessNode {
 
   // The map from columns to corresponding aggregation functions on that column.
   //    KEY: The index of a column in the input {@link TsBlock}.
   //    VALUE: Aggregation functions on this column.
   // (Currently, we only support one series in the aggregation function.)
-  private final Map<PartialPath, Set<AggregationType>> aggregateFuncMap;
+  @Deprecated private final Map<PartialPath, Set<AggregationType>> aggregateFuncMap;
+
+  // The list of aggregation functions, each Aggregation will be output as one column of result
+  // TsBlock
+  private List<Aggregation> aggregationList;
 
   // The parameter of `group by time`.
   // Its value will be null if there is no `group by time` clause.
-  private final GroupByTimeComponent groupByTimeParameter;
+  private final GroupByTimeParameter groupByTimeParameter;
 
-  private final List<ColumnHeader> columnHeaders = new ArrayList<>();
+  // column name and datatype of each output column
+  private final List<ColumnHeader> outputColumnHeaders = new ArrayList<>();
 
   private PlanNode child;
 
@@ -72,14 +77,14 @@ public class AggregateNode extends ProcessNode implements IOutputPlanNode {
       PlanNodeId id,
       PlanNode child,
       Map<PartialPath, Set<AggregationType>> aggregateFuncMap,
-      GroupByTimeComponent groupByTimeParameter) {
+      GroupByTimeParameter groupByTimeParameter) {
     super(id);
     this.child = child;
     this.aggregateFuncMap = aggregateFuncMap;
     this.groupByTimeParameter = groupByTimeParameter;
     for (Map.Entry<PartialPath, Set<AggregationType>> entry : aggregateFuncMap.entrySet()) {
       PartialPath path = entry.getKey();
-      columnHeaders.addAll(
+      outputColumnHeaders.addAll(
           entry.getValue().stream()
               .map(
                   functionName ->
@@ -111,17 +116,21 @@ public class AggregateNode extends ProcessNode implements IOutputPlanNode {
 
   @Override
   public List<ColumnHeader> getOutputColumnHeaders() {
-    return columnHeaders;
+    return outputColumnHeaders;
   }
 
   @Override
   public List<String> getOutputColumnNames() {
-    return columnHeaders.stream().map(ColumnHeader::getColumnName).collect(Collectors.toList());
+    return outputColumnHeaders.stream()
+        .map(ColumnHeader::getColumnName)
+        .collect(Collectors.toList());
   }
 
   @Override
   public List<TSDataType> getOutputColumnTypes() {
-    return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
+    return outputColumnHeaders.stream()
+        .map(ColumnHeader::getColumnType)
+        .collect(Collectors.toList());
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
index 72bdb6cd4c..21c1eae46f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
@@ -20,13 +20,11 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -49,24 +47,19 @@ import java.util.stream.Collectors;
  * same between these TsBlocks. If the input TsBlock contains n columns, the device-based view will
  * contain n+1 columns where the new column is Device column.
  */
-public class DeviceMergeNode extends ProcessNode implements IOutputPlanNode {
+public class DeviceMergeNode extends ProcessNode {
 
   // The result output order that this operator
   private OrderBy mergeOrder;
 
-  // The policy to decide whether a row should be discarded
-  // The without policy is able to be push down to the DeviceMergeNode because we can know whether a
-  // row contains
-  // null or not.
-  private FilterNullComponent filterNullComponent;
-
   // The map from deviceName to corresponding query result node responsible for that device.
   // DeviceNode means the node whose output TsBlock contains the data belonged to one device.
   private Map<String, PlanNode> childDeviceNodeMap = new HashMap<>();
 
-  private List<PlanNode> children;
+  // column name and datatype of each output column
+  private final List<ColumnHeader> outputColumnHeaders = new ArrayList<>();
 
-  private final List<ColumnHeader> columnHeaders = new ArrayList<>();
+  private List<PlanNode> children;
 
   public DeviceMergeNode(PlanNodeId id) {
     super(id);
@@ -79,10 +72,6 @@ public class DeviceMergeNode extends ProcessNode implements IOutputPlanNode {
     this.children = new ArrayList<>();
   }
 
-  public void setFilterNullComponent(FilterNullComponent filterNullComponent) {
-    this.filterNullComponent = filterNullComponent;
-  }
-
   @Override
   public List<PlanNode> getChildren() {
     return children;
@@ -110,28 +99,32 @@ public class DeviceMergeNode extends ProcessNode implements IOutputPlanNode {
   }
 
   private void updateColumnHeaders(PlanNode childNode) {
-    List<ColumnHeader> childColumnHeaders = ((IOutputPlanNode) childNode).getOutputColumnHeaders();
+    List<ColumnHeader> childColumnHeaders = childNode.getOutputColumnHeaders();
     for (ColumnHeader columnHeader : childColumnHeaders) {
       ColumnHeader tmpColumnHeader = columnHeader.replacePathWithMeasurement();
-      if (!columnHeaders.contains(tmpColumnHeader)) {
-        columnHeaders.add(tmpColumnHeader);
+      if (!outputColumnHeaders.contains(tmpColumnHeader)) {
+        outputColumnHeaders.add(tmpColumnHeader);
       }
     }
   }
 
   @Override
   public List<ColumnHeader> getOutputColumnHeaders() {
-    return columnHeaders;
+    return outputColumnHeaders;
   }
 
   @Override
   public List<String> getOutputColumnNames() {
-    return columnHeaders.stream().map(ColumnHeader::getColumnName).collect(Collectors.toList());
+    return outputColumnHeaders.stream()
+        .map(ColumnHeader::getColumnName)
+        .collect(Collectors.toList());
   }
 
   @Override
   public List<TSDataType> getOutputColumnTypes() {
-    return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
+    return outputColumnHeaders.stream()
+        .map(ColumnHeader::getColumnType)
+        .collect(Collectors.toList());
   }
 
   public OrderBy getMergeOrder() {
@@ -147,14 +140,13 @@ public class DeviceMergeNode extends ProcessNode implements IOutputPlanNode {
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.DEVICE_MERGE.serialize(byteBuffer);
     ReadWriteIOUtils.write(mergeOrder.ordinal(), byteBuffer);
-    filterNullComponent.serialize(byteBuffer);
     ReadWriteIOUtils.write(childDeviceNodeMap.size(), byteBuffer);
     for (Map.Entry<String, PlanNode> e : childDeviceNodeMap.entrySet()) {
       ReadWriteIOUtils.write(e.getKey(), byteBuffer);
       e.getValue().serialize(byteBuffer);
     }
-    ReadWriteIOUtils.write(columnHeaders.size(), byteBuffer);
-    for (ColumnHeader columnHeader : columnHeaders) {
+    ReadWriteIOUtils.write(outputColumnHeaders.size(), byteBuffer);
+    for (ColumnHeader columnHeader : outputColumnHeaders) {
       columnHeader.serialize(byteBuffer);
     }
   }
@@ -162,7 +154,6 @@ public class DeviceMergeNode extends ProcessNode implements IOutputPlanNode {
   public static DeviceMergeNode deserialize(ByteBuffer byteBuffer) {
     int orderByIndex = ReadWriteIOUtils.readInt(byteBuffer);
     OrderBy orderBy = OrderBy.values()[orderByIndex];
-    FilterNullComponent filterNullComponent = FilterNullComponent.deserialize(byteBuffer);
     Map<String, PlanNode> childDeviceNodeMap = new HashMap<>();
     int childDeviceNodeMapSize = ReadWriteIOUtils.readInt(byteBuffer);
     for (int i = 0; i < childDeviceNodeMapSize; i++) {
@@ -177,9 +168,8 @@ public class DeviceMergeNode extends ProcessNode implements IOutputPlanNode {
     }
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     DeviceMergeNode deviceMergeNode = new DeviceMergeNode(planNodeId, orderBy);
-    deviceMergeNode.filterNullComponent = filterNullComponent;
     deviceMergeNode.childDeviceNodeMap = childDeviceNodeMap;
-    deviceMergeNode.columnHeaders.addAll(columnHeaders);
+    deviceMergeNode.outputColumnHeaders.addAll(columnHeaders);
     return deviceMergeNode;
   }
 
@@ -207,12 +197,11 @@ public class DeviceMergeNode extends ProcessNode implements IOutputPlanNode {
 
     DeviceMergeNode that = (DeviceMergeNode) o;
     return mergeOrder == that.mergeOrder
-        && Objects.equals(filterNullComponent, that.filterNullComponent)
         && Objects.equals(childDeviceNodeMap, that.childDeviceNodeMap);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(mergeOrder, filterNullComponent, childDeviceNodeMap);
+    return Objects.hash(mergeOrder, childDeviceNodeMap);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index dc61098d7a..82998127ba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -21,12 +21,14 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import com.google.common.collect.ImmutableList;
@@ -86,6 +88,21 @@ public class ExchangeNode extends PlanNode {
     return CHILD_COUNT_NO_LIMIT;
   }
 
+  @Override
+  public List<ColumnHeader> getOutputColumnHeaders() {
+    return null;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
+  public List<TSDataType> getOutputColumnTypes() {
+    return null;
+  }
+
   public void setUpstream(TEndPoint endPoint, FragmentInstanceId instanceId, PlanNodeId nodeId) {
     this.upstreamEndpoint = endPoint;
     this.upstreamInstanceId = instanceId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
index b0ccf48018..b800b3308a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
@@ -38,7 +37,7 @@ import java.util.List;
 import java.util.Objects;
 
 /** FillNode is used to fill the empty field in one row. */
-public class FillNode extends ProcessNode implements IOutputPlanNode {
+public class FillNode extends ProcessNode {
 
   private PlanNode child;
 
@@ -76,17 +75,17 @@ public class FillNode extends ProcessNode implements IOutputPlanNode {
 
   @Override
   public List<ColumnHeader> getOutputColumnHeaders() {
-    return ((IOutputPlanNode) child).getOutputColumnHeaders();
+    return child.getOutputColumnHeaders();
   }
 
   @Override
   public List<String> getOutputColumnNames() {
-    return ((IOutputPlanNode) child).getOutputColumnNames();
+    return child.getOutputColumnNames();
   }
 
   @Override
   public List<TSDataType> getOutputColumnTypes() {
-    return ((IOutputPlanNode) child).getOutputColumnTypes();
+    return child.getOutputColumnTypes();
   }
 
   public FillPolicy getFillPolicy() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
index 25a117cb6f..0d74ee1092 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
@@ -40,7 +39,7 @@ import java.util.Objects;
 import java.util.stream.Collectors;
 
 /** The FilterNode is responsible to filter the RowRecord from TsBlock. */
-public class FilterNode extends ProcessNode implements IOutputPlanNode {
+public class FilterNode extends ProcessNode {
 
   private PlanNode child;
 
@@ -58,10 +57,9 @@ public class FilterNode extends ProcessNode implements IOutputPlanNode {
     this(id, predicate);
     this.child = child;
     this.columnHeaders =
-        ((IOutputPlanNode) child)
-            .getOutputColumnHeaders().stream()
-                .filter(columnHeader -> outputColumnNames.contains(columnHeader.getColumnName()))
-                .collect(Collectors.toList());
+        child.getOutputColumnHeaders().stream()
+            .filter(columnHeader -> outputColumnNames.contains(columnHeader.getColumnName()))
+            .collect(Collectors.toList());
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
index ecde0204d4..3a2201a489 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
@@ -20,15 +20,15 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.FilterNullParameter;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import com.google.common.collect.ImmutableList;
 
@@ -37,29 +37,36 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
-/** WithoutNode is used to discard specific rows from upstream node. */
-public class FilterNullNode extends ProcessNode implements IOutputPlanNode {
+/** FilterNullNode is used to discard specific rows from upstream node. */
+public class FilterNullNode extends ProcessNode {
 
-  // The policy to discard the result from upstream operator
-  private final FilterNullPolicy discardPolicy;
-
-  private final List<String> filterNullColumnNames;
+  FilterNullParameter filterNullParameter;
 
   private PlanNode child;
 
+  public FilterNullNode(PlanNodeId id, FilterNullParameter filterNullParameter) {
+    super(id);
+    this.filterNullParameter = filterNullParameter;
+  }
+
   public FilterNullNode(
-      PlanNodeId id, FilterNullPolicy policy, List<String> filterNullColumnNames) {
+      PlanNodeId id, FilterNullPolicy filterNullPolicy, List<InputLocation> filterNullColumns) {
     super(id);
-    this.discardPolicy = policy;
-    this.filterNullColumnNames = filterNullColumnNames;
+    this.filterNullParameter = new FilterNullParameter(filterNullPolicy, filterNullColumns);
   }
 
   public FilterNullNode(
       PlanNodeId id,
       PlanNode child,
-      FilterNullPolicy discardPolicy,
-      List<String> filterNullColumnNames) {
-    this(id, discardPolicy, filterNullColumnNames);
+      FilterNullPolicy filterNullPolicy,
+      List<InputLocation> filterNullColumns) {
+    super(id);
+    this.child = child;
+    this.filterNullParameter = new FilterNullParameter(filterNullPolicy, filterNullColumns);
+  }
+
+  public FilterNullNode(PlanNodeId id, PlanNode child, FilterNullParameter filterNullParameter) {
+    this(id, filterNullParameter);
     this.child = child;
   }
 
@@ -75,7 +82,7 @@ public class FilterNullNode extends ProcessNode implements IOutputPlanNode {
 
   @Override
   public PlanNode clone() {
-    return new FilterNullNode(getPlanNodeId(), discardPolicy, filterNullColumnNames);
+    return new FilterNullNode(getPlanNodeId(), filterNullParameter);
   }
 
   @Override
@@ -85,25 +92,25 @@ public class FilterNullNode extends ProcessNode implements IOutputPlanNode {
 
   @Override
   public List<ColumnHeader> getOutputColumnHeaders() {
-    return ((IOutputPlanNode) child).getOutputColumnHeaders();
+    return child.getOutputColumnHeaders();
   }
 
   @Override
   public List<String> getOutputColumnNames() {
-    return ((IOutputPlanNode) child).getOutputColumnNames();
+    return child.getOutputColumnNames();
   }
 
   @Override
   public List<TSDataType> getOutputColumnTypes() {
-    return ((IOutputPlanNode) child).getOutputColumnTypes();
+    return child.getOutputColumnTypes();
   }
 
   public FilterNullPolicy getDiscardPolicy() {
-    return discardPolicy;
+    return filterNullParameter.getFilterNullPolicy();
   }
 
-  public List<String> getFilterNullColumnNames() {
-    return filterNullColumnNames;
+  public List<InputLocation> getFilterNullColumns() {
+    return filterNullParameter.getFilterNullColumns();
   }
 
   @Override
@@ -114,30 +121,13 @@ public class FilterNullNode extends ProcessNode implements IOutputPlanNode {
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.FILTER_NULL.serialize(byteBuffer);
-    ReadWriteIOUtils.write(discardPolicy.ordinal(), byteBuffer);
-    if (filterNullColumnNames == null) {
-      ReadWriteIOUtils.write(-1, byteBuffer);
-    } else {
-      ReadWriteIOUtils.write(filterNullColumnNames.size(), byteBuffer);
-      for (String filterNullColumnName : filterNullColumnNames) {
-        ReadWriteIOUtils.write(filterNullColumnName, byteBuffer);
-      }
-    }
+    filterNullParameter.serialize(byteBuffer);
   }
 
   public static FilterNullNode deserialize(ByteBuffer byteBuffer) {
-    FilterNullPolicy filterNullPolicy =
-        FilterNullPolicy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
-    int size = ReadWriteIOUtils.readInt(byteBuffer);
-    List<String> filterNullColumnNames = null;
-    if (size != -1) {
-      filterNullColumnNames = new ArrayList<>();
-      for (int i = 0; i < size; i++) {
-        filterNullColumnNames.add(ReadWriteIOUtils.readString(byteBuffer));
-      }
-    }
+    FilterNullParameter filterNullParameter = FilterNullParameter.deserialize(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new FilterNullNode(planNodeId, filterNullPolicy, filterNullColumnNames);
+    return new FilterNullNode(planNodeId, filterNullParameter);
   }
 
   @TestOnly
@@ -145,7 +135,7 @@ public class FilterNullNode extends ProcessNode implements IOutputPlanNode {
     String title = String.format("[FilterNullNode (%s)]", this.getPlanNodeId());
     List<String> attributes = new ArrayList<>();
     attributes.add("FilterNullPolicy: " + this.getDiscardPolicy());
-    attributes.add("FilterNullColumnNames: " + this.getFilterNullColumnNames());
+    attributes.add("FilterNullColumns: " + this.getFilterNullColumns());
     return new Pair<>(title, attributes);
   }
 
@@ -160,13 +150,12 @@ public class FilterNullNode extends ProcessNode implements IOutputPlanNode {
     }
 
     FilterNullNode that = (FilterNullNode) o;
-    return discardPolicy == that.discardPolicy
-        && Objects.equals(child, that.child)
-        && Objects.equals(filterNullColumnNames, that.filterNullColumnNames);
+    return Objects.equals(filterNullParameter, that.filterNullParameter)
+        && Objects.equals(child, that.child);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(discardPolicy, child, filterNullColumnNames);
+    return Objects.hash(child, filterNullParameter);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
index dc06dec753..45ec5b0aa9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
@@ -20,11 +20,12 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.OutputColumn;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -35,7 +36,6 @@ import com.google.common.collect.ImmutableList;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -55,15 +55,25 @@ import java.util.stream.Collectors;
  * <p>If the group by level parameter is [0, 2], then these two columns will not belong to one
  * bucket. And the total buckets are `root.*.d1.s1` and `root.*.d2.s1`
  */
-public class GroupByLevelNode extends ProcessNode implements IOutputPlanNode {
+@Deprecated // TODO: delete later
+public class GroupByLevelNode extends ProcessNode {
 
   private final int[] groupByLevels;
 
-  private final Map<ColumnHeader, ColumnHeader> groupedPathMap;
+  @Deprecated private Map<ColumnHeader, ColumnHeader> groupedPathMap;
 
-  private PlanNode child;
+  // The list of aggregation functions, each aggregation will be output as one column of result
+  // TsBlock
+  private List<AggregationType> aggregateFuncList = new ArrayList<>();
+
+  // indicate each output column should use which value column of which input TsBlock and the
+  // overlapped situation
+  private List<OutputColumn> outputColumns = new ArrayList<>();
 
-  private final List<ColumnHeader> columnHeaders;
+  // column name and datatype of each output column
+  private List<ColumnHeader> outputColumnHeaders;
+
+  private PlanNode child;
 
   public GroupByLevelNode(
       PlanNodeId id,
@@ -74,7 +84,22 @@ public class GroupByLevelNode extends ProcessNode implements IOutputPlanNode {
     this.child = child;
     this.groupByLevels = groupByLevels;
     this.groupedPathMap = groupedPathMap;
-    this.columnHeaders = groupedPathMap.values().stream().distinct().collect(Collectors.toList());
+    this.outputColumnHeaders =
+        groupedPathMap.values().stream().distinct().collect(Collectors.toList());
+  }
+
+  public GroupByLevelNode(
+      PlanNodeId id,
+      PlanNode child,
+      int[] groupByLevels,
+      List<AggregationType> aggregateFuncList,
+      List<OutputColumn> outputColumns) {
+    super(id);
+    this.child = child;
+    this.groupByLevels = groupByLevels;
+    this.aggregateFuncList = aggregateFuncList;
+    this.outputColumns = outputColumns;
+    // TODO: init outputColumnHeaders
   }
 
   @Override
@@ -103,17 +128,21 @@ public class GroupByLevelNode extends ProcessNode implements IOutputPlanNode {
 
   @Override
   public List<ColumnHeader> getOutputColumnHeaders() {
-    return columnHeaders;
+    return outputColumnHeaders;
   }
 
   @Override
   public List<String> getOutputColumnNames() {
-    return columnHeaders.stream().map(ColumnHeader::getColumnName).collect(Collectors.toList());
+    return outputColumnHeaders.stream()
+        .map(ColumnHeader::getColumnName)
+        .collect(Collectors.toList());
   }
 
   @Override
   public List<TSDataType> getOutputColumnTypes() {
-    return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
+    return outputColumnHeaders.stream()
+        .map(ColumnHeader::getColumnType)
+        .collect(Collectors.toList());
   }
 
   @Override
@@ -125,13 +154,16 @@ public class GroupByLevelNode extends ProcessNode implements IOutputPlanNode {
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.GROUP_BY_LEVEL.serialize(byteBuffer);
     ReadWriteIOUtils.write(groupByLevels.length, byteBuffer);
-    for (int i = 0; i < groupByLevels.length; i++) {
-      ReadWriteIOUtils.write(groupByLevels[i], byteBuffer);
+    for (int level : groupByLevels) {
+      ReadWriteIOUtils.write(level, byteBuffer);
+    }
+    ReadWriteIOUtils.write(aggregateFuncList.size(), byteBuffer);
+    for (AggregationType aggregationType : aggregateFuncList) {
+      ReadWriteIOUtils.write(aggregationType.ordinal(), byteBuffer);
     }
-    ReadWriteIOUtils.write(groupedPathMap.size(), byteBuffer);
-    for (Map.Entry<ColumnHeader, ColumnHeader> e : groupedPathMap.entrySet()) {
-      e.getKey().serialize(byteBuffer);
-      e.getValue().serialize(byteBuffer);
+    ReadWriteIOUtils.write(outputColumns.size(), byteBuffer);
+    for (OutputColumn outputColumn : outputColumns) {
+      outputColumn.serialize(byteBuffer);
     }
   }
 
@@ -141,14 +173,18 @@ public class GroupByLevelNode extends ProcessNode implements IOutputPlanNode {
     for (int i = 0; i < groupByLevelSize; i++) {
       groupByLevels[i] = ReadWriteIOUtils.readInt(byteBuffer);
     }
-    int mapSize = ReadWriteIOUtils.readInt(byteBuffer);
-    Map<ColumnHeader, ColumnHeader> groupedPathMap = new HashMap<>();
-    for (int i = 0; i < mapSize; i++) {
-      groupedPathMap.put(
-          ColumnHeader.deserialize(byteBuffer), ColumnHeader.deserialize(byteBuffer));
+    int aggregateFuncListSize = ReadWriteIOUtils.readInt(byteBuffer);
+    List<AggregationType> aggregateFuncList = new ArrayList<>(aggregateFuncListSize);
+    for (int i = 0; i < aggregateFuncListSize; i++) {
+      aggregateFuncList.add(AggregationType.values()[ReadWriteIOUtils.readInt(byteBuffer)]);
+    }
+    int outputColumnsSize = ReadWriteIOUtils.readInt(byteBuffer);
+    List<OutputColumn> outputColumns = new ArrayList<>(outputColumnsSize);
+    for (int i = 0; i < outputColumnsSize; i++) {
+      outputColumns.add(OutputColumn.deserialize(byteBuffer));
     }
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new GroupByLevelNode(planNodeId, null, groupByLevels, groupedPathMap);
+    return new GroupByLevelNode(planNodeId, null, groupByLevels, aggregateFuncList, outputColumns);
   }
 
   @TestOnly
@@ -173,12 +209,13 @@ public class GroupByLevelNode extends ProcessNode implements IOutputPlanNode {
     GroupByLevelNode that = (GroupByLevelNode) o;
     return Objects.equals(child, that.child)
         && Arrays.equals(groupByLevels, that.groupByLevels)
-        && Objects.equals(groupedPathMap, that.groupedPathMap);
+        && Objects.equals(aggregateFuncList, that.aggregateFuncList)
+        && Objects.equals(outputColumns, that.outputColumns);
   }
 
   @Override
   public int hashCode() {
-    int result = Objects.hash(child, groupedPathMap);
+    int result = Objects.hash(super.hashCode(), aggregateFuncList, outputColumns);
     result = 31 * result + Arrays.hashCode(groupByLevels);
     return result;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
index ae0291edb1..fd7d9b1477 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
@@ -37,7 +36,7 @@ import java.util.List;
 import java.util.Objects;
 
 /** LimitNode is used to select top n result. It uses the default order of upstream nodes */
-public class LimitNode extends ProcessNode implements IOutputPlanNode {
+public class LimitNode extends ProcessNode {
 
   // The limit count
   private final int limit;
@@ -75,17 +74,17 @@ public class LimitNode extends ProcessNode implements IOutputPlanNode {
 
   @Override
   public List<ColumnHeader> getOutputColumnHeaders() {
-    return ((IOutputPlanNode) child).getOutputColumnHeaders();
+    return child.getOutputColumnHeaders();
   }
 
   @Override
   public List<String> getOutputColumnNames() {
-    return ((IOutputPlanNode) child).getOutputColumnNames();
+    return child.getOutputColumnNames();
   }
 
   @Override
   public List<TSDataType> getOutputColumnTypes() {
-    return ((IOutputPlanNode) child).getOutputColumnTypes();
+    return child.getOutputColumnTypes();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
index 15544e4f64..88f26b81d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
@@ -40,7 +39,7 @@ import java.util.Objects;
  * OffsetNode is used to skip top n result from upstream nodes. It uses the default order of
  * upstream nodes
  */
-public class OffsetNode extends ProcessNode implements IOutputPlanNode {
+public class OffsetNode extends ProcessNode {
 
   // The limit count
   private PlanNode child;
@@ -78,17 +77,17 @@ public class OffsetNode extends ProcessNode implements IOutputPlanNode {
 
   @Override
   public List<ColumnHeader> getOutputColumnHeaders() {
-    return ((IOutputPlanNode) child).getOutputColumnHeaders();
+    return child.getOutputColumnHeaders();
   }
 
   @Override
   public List<String> getOutputColumnNames() {
-    return ((IOutputPlanNode) child).getOutputColumnNames();
+    return child.getOutputColumnNames();
   }
 
   @Override
   public List<TSDataType> getOutputColumnTypes() {
-    return ((IOutputPlanNode) child).getOutputColumnTypes();
+    return child.getOutputColumnTypes();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
index c25262c723..2973ae400e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
@@ -41,13 +40,13 @@ import java.util.Objects;
  * In general, the parameter in sortNode should be pushed down to the upstream operators. In our
  * optimized logical query plan, the sortNode should not appear.
  */
-public class SortNode extends ProcessNode implements IOutputPlanNode {
+public class SortNode extends ProcessNode {
 
   private PlanNode child;
 
   private final List<String> orderBy;
 
-  private OrderBy sortOrder;
+  private final OrderBy sortOrder;
 
   public SortNode(PlanNodeId id, List<String> orderBy, OrderBy sortOrder) {
     super(id);
@@ -82,17 +81,17 @@ public class SortNode extends ProcessNode implements IOutputPlanNode {
 
   @Override
   public List<ColumnHeader> getOutputColumnHeaders() {
-    return ((IOutputPlanNode) child).getOutputColumnHeaders();
+    return child.getOutputColumnHeaders();
   }
 
   @Override
   public List<String> getOutputColumnNames() {
-    return ((IOutputPlanNode) child).getOutputColumnNames();
+    return child.getOutputColumnNames();
   }
 
   @Override
   public List<TSDataType> getOutputColumnTypes() {
-    return ((IOutputPlanNode) child).getOutputColumnTypes();
+    return child.getOutputColumnTypes();
   }
 
   public OrderBy getSortOrder() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
index 455ace6a6b..b34dd68bda 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
@@ -20,14 +20,13 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.InputLocation;
-import org.apache.iotdb.db.mpp.sql.planner.plan.OutputColumn;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.FilterNullParameter;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.OutputColumn;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -45,7 +44,7 @@ import java.util.stream.Collectors;
  * TimeJoinOperator is sorted by timestamp
  */
 // TODO: define the TimeJoinMergeNode for distributed plan
-public class TimeJoinNode extends ProcessNode implements IOutputPlanNode {
+public class TimeJoinNode extends ProcessNode {
 
   // This parameter indicates the order when executing multiway merge sort.
   private final OrderBy mergeOrder;
@@ -54,16 +53,17 @@ public class TimeJoinNode extends ProcessNode implements IOutputPlanNode {
   // The without policy is able to be push down to the TimeJoinOperator because we can know whether
   // a row contains
   // null or not.
-  private FilterNullPolicy filterNullPolicy = FilterNullPolicy.NO_FILTER;
-
-  private List<PlanNode> children;
-
-  private List<ColumnHeader> columnHeaders = new ArrayList<>();
+  private FilterNullParameter filterNullParameter;
 
   // indicate each output column should use which value column of which input TsBlock and the
   // overlapped situation
   // size of outputColumns must be equal to the size of columnHeaders
-  private List<OutputColumn> outputColumns;
+  private List<OutputColumn> outputColumns = new ArrayList<>();
+
+  // column name and datatype of each output column
+  private final List<ColumnHeader> outputColumnHeaders = new ArrayList<>();
+
+  private List<PlanNode> children;
 
   public TimeJoinNode(PlanNodeId id, OrderBy mergeOrder) {
     super(id);
@@ -74,12 +74,7 @@ public class TimeJoinNode extends ProcessNode implements IOutputPlanNode {
   public TimeJoinNode(PlanNodeId id, OrderBy mergeOrder, List<PlanNode> children) {
     this(id, mergeOrder);
     this.children = children;
-    initColumnHeaders();
-    outputColumns = new ArrayList<>(columnHeaders.size());
-    // TODO need to be fixed by minghui, construct outputColumns in LogicalPlanner
-    for (int i = 0; i < columnHeaders.size(); i++) {
-      outputColumns.add(new OutputColumn(new InputLocation(i, 0)));
-    }
+    initOutputColumns();
   }
 
   @Override
@@ -91,7 +86,7 @@ public class TimeJoinNode extends ProcessNode implements IOutputPlanNode {
   public PlanNode clone() {
     // TODO: (xingtanzjr)
     TimeJoinNode cloneNode = new TimeJoinNode(getPlanNodeId(), this.mergeOrder);
-    cloneNode.columnHeaders = this.columnHeaders;
+    cloneNode.outputColumns = this.outputColumns;
     return cloneNode;
   }
 
@@ -100,25 +95,40 @@ public class TimeJoinNode extends ProcessNode implements IOutputPlanNode {
     return CHILD_COUNT_NO_LIMIT;
   }
 
-  private void initColumnHeaders() {
-    for (PlanNode child : children) {
-      columnHeaders.addAll(((IOutputPlanNode) child).getOutputColumnHeaders());
+  public List<OutputColumn> getOutputColumns() {
+    return outputColumns;
+  }
+
+  private void initOutputColumns() {
+    for (int tsBlockIndex = 0; tsBlockIndex < children.size(); tsBlockIndex++) {
+      List<ColumnHeader> childColumnHeaders = children.get(tsBlockIndex).getOutputColumnHeaders();
+      for (int valueColumnIndex = 0;
+          valueColumnIndex < childColumnHeaders.size();
+          valueColumnIndex++) {
+        InputLocation inputLocation = new InputLocation(tsBlockIndex, valueColumnIndex);
+        outputColumns.add(new OutputColumn(inputLocation));
+      }
+      outputColumnHeaders.addAll(childColumnHeaders);
     }
   }
 
   @Override
   public List<ColumnHeader> getOutputColumnHeaders() {
-    return columnHeaders;
+    return outputColumnHeaders;
   }
 
   @Override
   public List<String> getOutputColumnNames() {
-    return columnHeaders.stream().map(ColumnHeader::getColumnName).collect(Collectors.toList());
+    return outputColumnHeaders.stream()
+        .map(ColumnHeader::getColumnName)
+        .collect(Collectors.toList());
   }
 
   @Override
   public List<TSDataType> getOutputColumnTypes() {
-    return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
+    return outputColumnHeaders.stream()
+        .map(ColumnHeader::getColumnType)
+        .collect(Collectors.toList());
   }
 
   @Override
@@ -130,30 +140,35 @@ public class TimeJoinNode extends ProcessNode implements IOutputPlanNode {
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.TIME_JOIN.serialize(byteBuffer);
     ReadWriteIOUtils.write(mergeOrder.ordinal(), byteBuffer);
-    ReadWriteIOUtils.write(filterNullPolicy.ordinal(), byteBuffer);
-    ReadWriteIOUtils.write(columnHeaders.size(), byteBuffer);
-    for (ColumnHeader columnHeader : columnHeaders) {
+    filterNullParameter.serialize(byteBuffer);
+    ReadWriteIOUtils.write(outputColumns.size(), byteBuffer);
+    for (OutputColumn outputColumn : outputColumns) {
+      outputColumn.serialize(byteBuffer);
+    }
+    ReadWriteIOUtils.write(outputColumnHeaders.size(), byteBuffer);
+    for (ColumnHeader columnHeader : outputColumnHeaders) {
       columnHeader.serialize(byteBuffer);
     }
   }
 
-  public List<OutputColumn> getOutputColumns() {
-    return outputColumns;
-  }
-
   public static TimeJoinNode deserialize(ByteBuffer byteBuffer) {
     OrderBy orderBy = OrderBy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
-    FilterNullPolicy filterNullPolicy =
-        FilterNullPolicy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
-    int columnHeaderSize = ReadWriteIOUtils.readInt(byteBuffer);
-    List<ColumnHeader> columnHeaders = new ArrayList<>();
-    for (int i = 0; i < columnHeaderSize; i++) {
-      columnHeaders.add(ColumnHeader.deserialize(byteBuffer));
+    FilterNullParameter filterNullParameter = FilterNullParameter.deserialize(byteBuffer);
+    int outputColumnSize = ReadWriteIOUtils.readInt(byteBuffer);
+    List<OutputColumn> outputColumns = new ArrayList<>(outputColumnSize);
+    for (int i = 0; i < outputColumnSize; i++) {
+      outputColumns.add(OutputColumn.deserialize(byteBuffer));
+    }
+    int outputColumnHeadersSize = ReadWriteIOUtils.readInt(byteBuffer);
+    List<ColumnHeader> outputColumnHeaders = new ArrayList<>(outputColumnHeadersSize);
+    for (int i = 0; i < outputColumnHeadersSize; i++) {
+      outputColumnHeaders.add(ColumnHeader.deserialize(byteBuffer));
     }
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     TimeJoinNode timeJoinNode = new TimeJoinNode(planNodeId, orderBy);
-    timeJoinNode.columnHeaders.addAll(columnHeaders);
-    timeJoinNode.filterNullPolicy = filterNullPolicy;
+    timeJoinNode.outputColumns.addAll(outputColumns);
+    timeJoinNode.outputColumnHeaders.addAll(outputColumnHeaders);
+    timeJoinNode.setFilterNullParameter(filterNullParameter);
 
     return timeJoinNode;
   }
@@ -171,12 +186,12 @@ public class TimeJoinNode extends ProcessNode implements IOutputPlanNode {
     return mergeOrder;
   }
 
-  public FilterNullPolicy getFilterNullPolicy() {
-    return filterNullPolicy;
+  public FilterNullParameter getFilterNullParameter() {
+    return filterNullParameter;
   }
 
-  public void setWithoutPolicy(FilterNullPolicy filterNullPolicy) {
-    this.filterNullPolicy = filterNullPolicy;
+  public void setFilterNullParameter(FilterNullParameter filterNullParameter) {
+    this.filterNullParameter = filterNullParameter;
   }
 
   public String toString() {
@@ -190,7 +205,9 @@ public class TimeJoinNode extends ProcessNode implements IOutputPlanNode {
     attributes.add("MergeOrder: " + (this.getMergeOrder() == null ? "null" : this.getMergeOrder()));
     attributes.add(
         "FilterNullPolicy: "
-            + (this.getFilterNullPolicy() == null ? "null" : this.getFilterNullPolicy()));
+            + (this.getFilterNullParameter() == null
+                ? "null"
+                : this.getFilterNullParameter().getFilterNullPolicy()));
     return new Pair<>(title, attributes);
   }
 
@@ -206,12 +223,12 @@ public class TimeJoinNode extends ProcessNode implements IOutputPlanNode {
 
     TimeJoinNode that = (TimeJoinNode) o;
     return mergeOrder == that.mergeOrder
-        && filterNullPolicy == that.filterNullPolicy
+        && Objects.equals(filterNullParameter, that.filterNullParameter)
         && Objects.equals(children, that.children);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(mergeOrder, filterNullPolicy, children);
+    return Objects.hash(mergeOrder, filterNullParameter, children);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index c32b2b2062..48ce66b0ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -20,10 +20,12 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import com.google.common.collect.ImmutableList;
@@ -83,6 +85,21 @@ public class FragmentSinkNode extends SinkNode {
     return ONE_CHILD;
   }
 
+  @Override
+  public List<ColumnHeader> getOutputColumnHeaders() {
+    return null;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
+  public List<TSDataType> getOutputColumnTypes() {
+    return null;
+  }
+
   public static FragmentSinkNode deserialize(ByteBuffer byteBuffer) {
     TEndPoint downStreamEndpoint =
         new TEndPoint(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
index 88bb0742bb..dc58e42413 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
@@ -24,12 +24,12 @@ import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.sql.statement.component.GroupByTimeComponent;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.Aggregation;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@ -67,28 +67,31 @@ import java.util.stream.Collectors;
  * represent the whole aggregation result of this series. And the timestamp will be 0, which is
  * meaningless.
  */
-public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNode {
+public class SeriesAggregateScanNode extends SourceNode {
 
   // The series path and aggregation functions on this series.
   // (Currently, we only support one series in the aggregation function)
   private final PartialPath seriesPath;
-  private final List<AggregationType> aggregateFuncList;
+  @Deprecated private final List<AggregationType> aggregateFuncList;
+  private List<Aggregation> aggregationList;
 
   // all the sensors in seriesPath's device of current query
-  private Set<String> allSensors;
+  private final Set<String> allSensors;
 
   // The order to traverse the data.
   // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
   // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
   private final OrderBy scanOrder;
 
+  // time filter for current series, could be null if doesn't exist
   private final Filter timeFilter;
 
   // The parameter of `group by time`
   // Its value will be null if there is no `group by time` clause,
-  private final GroupByTimeComponent groupByTimeParameter;
+  private final GroupByTimeParameter groupByTimeParameter;
 
-  private List<ColumnHeader> columnHeaders;
+  // contain output column headers of this node
+  private final List<ColumnHeader> outputColumnHeaders;
 
   // The id of DataRegion where the node will run
   private TRegionReplicaSet regionReplicaSet;
@@ -100,7 +103,7 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
       List<AggregationType> aggregateFuncList,
       OrderBy scanOrder,
       Filter timeFilter,
-      GroupByTimeComponent groupByTimeParameter) {
+      GroupByTimeParameter groupByTimeParameter) {
     super(id);
     this.seriesPath = seriesPath;
     this.allSensors = allSensors;
@@ -108,7 +111,7 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
     this.scanOrder = scanOrder;
     this.timeFilter = timeFilter;
     this.groupByTimeParameter = groupByTimeParameter;
-    this.columnHeaders =
+    this.outputColumnHeaders =
         aggregateFuncList.stream()
             .map(
                 functionType ->
@@ -129,7 +132,7 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
     return timeFilter;
   }
 
-  public GroupByTimeComponent getGroupByTimeParameter() {
+  public GroupByTimeParameter getGroupByTimeParameter() {
     return groupByTimeParameter;
   }
 
@@ -153,17 +156,21 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
 
   @Override
   public List<ColumnHeader> getOutputColumnHeaders() {
-    return columnHeaders;
+    return outputColumnHeaders;
   }
 
   @Override
   public List<String> getOutputColumnNames() {
-    return columnHeaders.stream().map(ColumnHeader::getColumnName).collect(Collectors.toList());
+    return outputColumnHeaders.stream()
+        .map(ColumnHeader::getColumnName)
+        .collect(Collectors.toList());
   }
 
   @Override
   public List<TSDataType> getOutputColumnTypes() {
-    return columnHeaders.stream().map(ColumnHeader::getColumnType).collect(Collectors.toList());
+    return outputColumnHeaders.stream()
+        .map(ColumnHeader::getColumnType)
+        .collect(Collectors.toList());
   }
 
   @Override
@@ -227,7 +234,7 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
     OrderBy scanOrder = OrderBy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
     Filter timeFilter = FilterFactory.deserialize(byteBuffer);
 
-    GroupByTimeComponent groupByTimeComponent = GroupByTimeComponent.deserialize(byteBuffer);
+    GroupByTimeParameter groupByTimeParameter = GroupByTimeParameter.deserialize(byteBuffer);
     TRegionReplicaSet regionReplicaSet = ThriftCommonsSerDeUtils.readTRegionReplicaSet(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     SeriesAggregateScanNode seriesAggregateScanNode =
@@ -238,7 +245,7 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
             aggregateFuncList,
             scanOrder,
             timeFilter,
-            groupByTimeComponent);
+            groupByTimeParameter);
     seriesAggregateScanNode.regionReplicaSet = regionReplicaSet;
     return seriesAggregateScanNode;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
index 76327e002f..adc814e043 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.sql.planner.plan.IOutputPlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
@@ -52,7 +51,7 @@ import java.util.Set;
  *
  * <p>Children type: no child is allowed for SeriesScanNode
  */
-public class SeriesScanNode extends SourceNode implements IOutputPlanNode {
+public class SeriesScanNode extends SourceNode {
 
   // The path of the target series which will be scanned.
   private final PartialPath seriesPath;
@@ -77,7 +76,8 @@ public class SeriesScanNode extends SourceNode implements IOutputPlanNode {
   // offset for result set. The default value is 0
   private int offset;
 
-  private ColumnHeader columnHeader;
+  // contain output column header of this node
+  private ColumnHeader outputColumnHeader;
 
   // The id of DataRegion where the node will run
   private TRegionReplicaSet regionReplicaSet;
@@ -93,7 +93,8 @@ public class SeriesScanNode extends SourceNode implements IOutputPlanNode {
     this.seriesPath = seriesPath;
     this.allSensors = allSensors;
     this.scanOrder = scanOrder;
-    this.columnHeader = new ColumnHeader(seriesPath.getFullPath(), seriesPath.getSeriesType());
+    this.outputColumnHeader =
+        new ColumnHeader(seriesPath.getFullPath(), seriesPath.getSeriesType());
   }
 
   public SeriesScanNode(PlanNodeId id, PartialPath seriesPath, TRegionReplicaSet regionReplicaSet) {
@@ -161,17 +162,17 @@ public class SeriesScanNode extends SourceNode implements IOutputPlanNode {
 
   @Override
   public List<ColumnHeader> getOutputColumnHeaders() {
-    return ImmutableList.of(columnHeader);
+    return ImmutableList.of(outputColumnHeader);
   }
 
   @Override
   public List<String> getOutputColumnNames() {
-    return ImmutableList.of(columnHeader.getColumnName());
+    return ImmutableList.of(outputColumnHeader.getColumnName());
   }
 
   @Override
   public List<TSDataType> getOutputColumnTypes() {
-    return ImmutableList.of(columnHeader.getColumnType());
+    return ImmutableList.of(outputColumnHeader.getColumnType());
   }
 
   public Set<String> getAllSensors() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
index ee66e98396..2c15674023 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -21,11 +21,13 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -160,6 +162,21 @@ public class InsertMultiTabletsNode extends InsertNode {
     return NO_CHILD_ALLOWED;
   }
 
+  @Override
+  public List<ColumnHeader> getOutputColumnHeaders() {
+    return null;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
+  public List<TSDataType> getOutputColumnTypes() {
+    return null;
+  }
+
   public static InsertMultiTabletsNode deserialize(ByteBuffer byteBuffer) {
     return null;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
index eb25abadfe..32ed159e1a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -102,6 +103,21 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
     return NO_CHILD_ALLOWED;
   }
 
+  @Override
+  public List<ColumnHeader> getOutputColumnHeaders() {
+    return null;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
+  public List<TSDataType> getOutputColumnTypes() {
+    return null;
+  }
+
   @Override
   public int serializedSize() {
     int size = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
index 62fad82b81..a2e2ebe4c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
@@ -22,11 +22,13 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -123,6 +125,21 @@ public class InsertRowsNode extends InsertNode {
     return NO_CHILD_ALLOWED;
   }
 
+  @Override
+  public List<ColumnHeader> getOutputColumnHeaders() {
+    return null;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
+  public List<TSDataType> getOutputColumnTypes() {
+    return null;
+  }
+
   public static InsertRowsNode deserialize(ByteBuffer byteBuffer) {
     return null;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index f83912efa9..40b6d7f4b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -22,11 +22,13 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -104,6 +106,21 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
     return NO_CHILD_ALLOWED;
   }
 
+  @Override
+  public List<ColumnHeader> getOutputColumnHeaders() {
+    return null;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
+  public List<TSDataType> getOutputColumnTypes() {
+    return null;
+  }
+
   public static InsertRowsOfOneDeviceNode deserialize(ByteBuffer byteBuffer) {
     return null;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
index e03cfcaab1..15ed741d5d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -153,6 +154,21 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
     return NO_CHILD_ALLOWED;
   }
 
+  @Override
+  public List<ColumnHeader> getOutputColumnHeaders() {
+    return null;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+
+  @Override
+  public List<TSDataType> getOutputColumnTypes() {
+    return null;
+  }
+
   @Override
   public int serializedSize() {
     return serializedSize(0, rowCount);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IOutputPlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/Aggregation.java
similarity index 60%
rename from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IOutputPlanNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/Aggregation.java
index 661066a314..62d08ab319 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IOutputPlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/Aggregation.java
@@ -17,18 +17,27 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan.parameter;
 
-import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.List;
 
-public interface IOutputPlanNode {
+public class Aggregation {
 
-  List<ColumnHeader> getOutputColumnHeaders();
+  // aggregation function name
+  private AggregationType aggregationType;
 
-  List<String> getOutputColumnNames();
+  // indicate the input and output type
+  private AggregationStep step;
 
-  List<TSDataType> getOutputColumnTypes();
+  // indicate this aggregation should use which value column of which input TsBlock
+  private List<InputLocation> inputLocations;
+
+  // datatype of each input value column
+  private List<TSDataType> inputDateTypes;
+
+  // datatype of output value column
+  private TSDataType outputDateType;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/AggregationStep.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/AggregationStep.java
new file mode 100644
index 0000000000..bf9b78546d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/AggregationStep.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.planner.plan.parameter;
+
+/**
+ * This attribute indicates the input and output type of the {@code Aggregator}.
+ *
+ * <p>There are three types of input/output:
+ *
+ * <ul>
+ *   <li>Raw: raw data, as input only
+ *   <li>Partial: intermediate aggregation result
+ *   <li>Final: final aggregation result, as output only
+ * </ul>
+ */
+public enum AggregationStep {
+
+  // input Raw, output Partial
+  PARTIAL(true, true),
+  // input Partial, output Final
+  FINAL(false, false),
+  // input Partial, output Partial
+  INTERMEDIATE(false, true),
+  // input Raw, output Final
+  SINGLE(true, false);
+
+  private final boolean inputRaw;
+  private final boolean outputPartial;
+
+  AggregationStep(boolean inputRaw, boolean outputPartial) {
+    this.inputRaw = inputRaw;
+    this.outputPartial = outputPartial;
+  }
+
+  public boolean isInputRaw() {
+    return inputRaw;
+  }
+
+  public boolean isOutputPartial() {
+    return outputPartial;
+  }
+
+  public static AggregationStep partialOutput(AggregationStep step) {
+    if (step.isInputRaw()) {
+      return AggregationStep.PARTIAL;
+    }
+    return AggregationStep.INTERMEDIATE;
+  }
+
+  public static AggregationStep partialInput(AggregationStep step) {
+    if (step.isOutputPartial()) {
+      return AggregationStep.INTERMEDIATE;
+    }
+    return AggregationStep.FINAL;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/FilterNullParameter.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/FilterNullParameter.java
new file mode 100644
index 0000000000..a66a7cdd90
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/FilterNullParameter.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.planner.plan.parameter;
+
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class FilterNullParameter {
+
+  // The policy to discard the result from upstream npde
+  private final FilterNullPolicy filterNullPolicy;
+
+  // indicate columns used to filter null
+  private final List<InputLocation> filterNullColumns;
+
+  public FilterNullParameter(
+      FilterNullPolicy filterNullPolicy, List<InputLocation> filterNullColumns) {
+    this.filterNullPolicy = filterNullPolicy;
+    this.filterNullColumns = filterNullColumns;
+  }
+
+  public FilterNullPolicy getFilterNullPolicy() {
+    return filterNullPolicy;
+  }
+
+  public List<InputLocation> getFilterNullColumns() {
+    return filterNullColumns;
+  }
+
+  public void serialize(ByteBuffer byteBuffer) {
+    ReadWriteIOUtils.write(filterNullPolicy.ordinal(), byteBuffer);
+    ReadWriteIOUtils.write(filterNullColumns.size(), byteBuffer);
+    for (InputLocation filterNullColumn : filterNullColumns) {
+      filterNullColumn.serialize(byteBuffer);
+    }
+  }
+
+  public static FilterNullParameter deserialize(ByteBuffer byteBuffer) {
+    FilterNullPolicy filterNullPolicy =
+        FilterNullPolicy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+    int size = ReadWriteIOUtils.readInt(byteBuffer);
+    List<InputLocation> filterNullColumns = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      filterNullColumns.add(InputLocation.deserialize(byteBuffer));
+    }
+    return new FilterNullParameter(filterNullPolicy, filterNullColumns);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    FilterNullParameter that = (FilterNullParameter) o;
+    return filterNullPolicy == that.filterNullPolicy
+        && Objects.equals(filterNullColumns, that.filterNullColumns);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(filterNullPolicy, filterNullColumns);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByTimeComponent.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/GroupByTimeParameter.java
similarity index 75%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByTimeComponent.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/GroupByTimeParameter.java
index 026297a331..9fa368a67f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByTimeComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/GroupByTimeParameter.java
@@ -17,16 +17,15 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.sql.statement.component;
+package org.apache.iotdb.db.mpp.sql.planner.plan.parameter;
 
-import org.apache.iotdb.db.mpp.sql.statement.StatementNode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
-/** This class maintains information of {@code GROUP BY} clause. */
-public class GroupByTimeComponent extends StatementNode {
+/** The parameter of `GROUP BY TIME` */
+public class GroupByTimeParameter {
 
   // [startTime, endTime)
   private long startTime;
@@ -45,14 +44,14 @@ public class GroupByTimeComponent extends StatementNode {
   // if it is left close and right open interval
   private boolean leftCRightO;
 
-  public GroupByTimeComponent() {}
+  public GroupByTimeParameter() {}
 
-  public GroupByTimeComponent(
+  public GroupByTimeParameter(
       long startTime, long endTime, long interval, long slidingStep, boolean leftCRightO) {
     this(startTime, endTime, interval, slidingStep, false, false, leftCRightO);
   }
 
-  public GroupByTimeComponent(
+  public GroupByTimeParameter(
       long startTime,
       long endTime,
       long interval,
@@ -69,22 +68,6 @@ public class GroupByTimeComponent extends StatementNode {
     this.leftCRightO = leftCRightO;
   }
 
-  public boolean isLeftCRightO() {
-    return leftCRightO;
-  }
-
-  public void setLeftCRightO(boolean leftCRightO) {
-    this.leftCRightO = leftCRightO;
-  }
-
-  public long getInterval() {
-    return interval;
-  }
-
-  public void setInterval(long interval) {
-    this.interval = interval;
-  }
-
   public long getStartTime() {
     return startTime;
   }
@@ -101,6 +84,14 @@ public class GroupByTimeComponent extends StatementNode {
     this.endTime = endTime;
   }
 
+  public long getInterval() {
+    return interval;
+  }
+
+  public void setInterval(long interval) {
+    this.interval = interval;
+  }
+
   public long getSlidingStep() {
     return slidingStep;
   }
@@ -109,20 +100,28 @@ public class GroupByTimeComponent extends StatementNode {
     this.slidingStep = slidingStep;
   }
 
+  public boolean isIntervalByMonth() {
+    return isIntervalByMonth;
+  }
+
+  public void setIntervalByMonth(boolean intervalByMonth) {
+    isIntervalByMonth = intervalByMonth;
+  }
+
   public boolean isSlidingStepByMonth() {
     return isSlidingStepByMonth;
   }
 
-  public void setSlidingStepByMonth(boolean isSlidingStepByMonth) {
-    this.isSlidingStepByMonth = isSlidingStepByMonth;
+  public void setSlidingStepByMonth(boolean slidingStepByMonth) {
+    isSlidingStepByMonth = slidingStepByMonth;
   }
 
-  public boolean isIntervalByMonth() {
-    return isIntervalByMonth;
+  public boolean isLeftCRightO() {
+    return leftCRightO;
   }
 
-  public void setIntervalByMonth(boolean isIntervalByMonth) {
-    this.isIntervalByMonth = isIntervalByMonth;
+  public void setLeftCRightO(boolean leftCRightO) {
+    this.leftCRightO = leftCRightO;
   }
 
   public void serialize(ByteBuffer buffer) {
@@ -135,23 +134,23 @@ public class GroupByTimeComponent extends StatementNode {
     ReadWriteIOUtils.write(leftCRightO, buffer);
   }
 
-  public static GroupByTimeComponent deserialize(ByteBuffer buffer) {
-    GroupByTimeComponent groupByTimeComponent = new GroupByTimeComponent();
-    groupByTimeComponent.setStartTime(ReadWriteIOUtils.readLong(buffer));
-    groupByTimeComponent.setEndTime(ReadWriteIOUtils.readLong(buffer));
-    groupByTimeComponent.setInterval(ReadWriteIOUtils.readLong(buffer));
-    groupByTimeComponent.setSlidingStep(ReadWriteIOUtils.readLong(buffer));
-    groupByTimeComponent.setIntervalByMonth(ReadWriteIOUtils.readBool(buffer));
-    groupByTimeComponent.setSlidingStepByMonth(ReadWriteIOUtils.readBool(buffer));
-    groupByTimeComponent.setLeftCRightO(ReadWriteIOUtils.readBool(buffer));
-    return groupByTimeComponent;
+  public static GroupByTimeParameter deserialize(ByteBuffer buffer) {
+    GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter();
+    groupByTimeParameter.setStartTime(ReadWriteIOUtils.readLong(buffer));
+    groupByTimeParameter.setEndTime(ReadWriteIOUtils.readLong(buffer));
+    groupByTimeParameter.setInterval(ReadWriteIOUtils.readLong(buffer));
+    groupByTimeParameter.setSlidingStep(ReadWriteIOUtils.readLong(buffer));
+    groupByTimeParameter.setIntervalByMonth(ReadWriteIOUtils.readBool(buffer));
+    groupByTimeParameter.setSlidingStepByMonth(ReadWriteIOUtils.readBool(buffer));
+    groupByTimeParameter.setLeftCRightO(ReadWriteIOUtils.readBool(buffer));
+    return groupByTimeParameter;
   }
 
   public boolean equals(Object obj) {
-    if (!(obj instanceof GroupByTimeComponent)) {
+    if (!(obj instanceof GroupByTimeParameter)) {
       return false;
     }
-    GroupByTimeComponent other = (GroupByTimeComponent) obj;
+    GroupByTimeParameter other = (GroupByTimeParameter) obj;
     return this.startTime == other.startTime
         && this.endTime == other.endTime
         && this.interval == other.interval
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/InputLocation.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/InputLocation.java
similarity index 55%
rename from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/InputLocation.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/InputLocation.java
index 705d8cbace..fda0f4a957 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/InputLocation.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/InputLocation.java
@@ -16,7 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan.parameter;
+
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
 
 public class InputLocation {
   // which input tsblock
@@ -36,4 +41,32 @@ public class InputLocation {
   public int getValueColumnIndex() {
     return valueColumnIndex;
   }
+
+  public void serialize(ByteBuffer byteBuffer) {
+    ReadWriteIOUtils.write(tsBlockIndex, byteBuffer);
+    ReadWriteIOUtils.write(valueColumnIndex, byteBuffer);
+  }
+
+  public static InputLocation deserialize(ByteBuffer byteBuffer) {
+    int tsBlockIndex = ReadWriteIOUtils.readInt(byteBuffer);
+    int valueColumnIndex = ReadWriteIOUtils.readInt(byteBuffer);
+    return new InputLocation(tsBlockIndex, valueColumnIndex);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    InputLocation that = (InputLocation) o;
+    return tsBlockIndex == that.tsBlockIndex && valueColumnIndex == that.valueColumnIndex;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(tsBlockIndex, valueColumnIndex);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/OutputColumn.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/OutputColumn.java
similarity index 72%
rename from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/OutputColumn.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/OutputColumn.java
index 168027ef4f..f5e46777b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/OutputColumn.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/OutputColumn.java
@@ -16,10 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.db.mpp.sql.planner.plan.parameter;
+
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import com.google.common.collect.ImmutableList;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -66,4 +69,23 @@ public class OutputColumn {
     checkArgument(index < sourceLocations.size(), "index is not valid");
     return sourceLocations.get(index);
   }
+
+  public void serialize(ByteBuffer byteBuffer) {
+    ReadWriteIOUtils.write(sourceLocations.size(), byteBuffer);
+    for (InputLocation sourceLocation : sourceLocations) {
+      sourceLocation.serialize(byteBuffer);
+    }
+    ReadWriteIOUtils.write(overlapped, byteBuffer);
+  }
+
+  public static OutputColumn deserialize(ByteBuffer byteBuffer) {
+    int sourceLocationSize = ReadWriteIOUtils.readInt(byteBuffer);
+    ImmutableList.Builder<InputLocation> sourceLocations = ImmutableList.builder();
+    while (sourceLocationSize > 0) {
+      sourceLocations.add(InputLocation.deserialize(byteBuffer));
+      sourceLocationSize--;
+    }
+    boolean overlapped = ReadWriteIOUtils.readBool(byteBuffer);
+    return new OutputColumn(sourceLocations.build(), overlapped);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByTimeComponent.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByTimeComponent.java
index 026297a331..dbb2daa415 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByTimeComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByTimeComponent.java
@@ -20,10 +20,6 @@
 package org.apache.iotdb.db.mpp.sql.statement.component;
 
 import org.apache.iotdb.db.mpp.sql.statement.StatementNode;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-import java.nio.ByteBuffer;
-import java.util.Objects;
 
 /** This class maintains information of {@code GROUP BY} clause. */
 public class GroupByTimeComponent extends StatementNode {
@@ -47,28 +43,6 @@ public class GroupByTimeComponent extends StatementNode {
 
   public GroupByTimeComponent() {}
 
-  public GroupByTimeComponent(
-      long startTime, long endTime, long interval, long slidingStep, boolean leftCRightO) {
-    this(startTime, endTime, interval, slidingStep, false, false, leftCRightO);
-  }
-
-  public GroupByTimeComponent(
-      long startTime,
-      long endTime,
-      long interval,
-      long slidingStep,
-      boolean isIntervalByMonth,
-      boolean isSlidingStepByMonth,
-      boolean leftCRightO) {
-    this.startTime = startTime;
-    this.endTime = endTime;
-    this.interval = interval;
-    this.slidingStep = slidingStep;
-    this.isIntervalByMonth = isIntervalByMonth;
-    this.isSlidingStepByMonth = isSlidingStepByMonth;
-    this.leftCRightO = leftCRightO;
-  }
-
   public boolean isLeftCRightO() {
     return leftCRightO;
   }
@@ -124,51 +98,4 @@ public class GroupByTimeComponent extends StatementNode {
   public void setIntervalByMonth(boolean isIntervalByMonth) {
     this.isIntervalByMonth = isIntervalByMonth;
   }
-
-  public void serialize(ByteBuffer buffer) {
-    ReadWriteIOUtils.write(startTime, buffer);
-    ReadWriteIOUtils.write(endTime, buffer);
-    ReadWriteIOUtils.write(interval, buffer);
-    ReadWriteIOUtils.write(slidingStep, buffer);
-    ReadWriteIOUtils.write(isIntervalByMonth, buffer);
-    ReadWriteIOUtils.write(isSlidingStepByMonth, buffer);
-    ReadWriteIOUtils.write(leftCRightO, buffer);
-  }
-
-  public static GroupByTimeComponent deserialize(ByteBuffer buffer) {
-    GroupByTimeComponent groupByTimeComponent = new GroupByTimeComponent();
-    groupByTimeComponent.setStartTime(ReadWriteIOUtils.readLong(buffer));
-    groupByTimeComponent.setEndTime(ReadWriteIOUtils.readLong(buffer));
-    groupByTimeComponent.setInterval(ReadWriteIOUtils.readLong(buffer));
-    groupByTimeComponent.setSlidingStep(ReadWriteIOUtils.readLong(buffer));
-    groupByTimeComponent.setIntervalByMonth(ReadWriteIOUtils.readBool(buffer));
-    groupByTimeComponent.setSlidingStepByMonth(ReadWriteIOUtils.readBool(buffer));
-    groupByTimeComponent.setLeftCRightO(ReadWriteIOUtils.readBool(buffer));
-    return groupByTimeComponent;
-  }
-
-  public boolean equals(Object obj) {
-    if (!(obj instanceof GroupByTimeComponent)) {
-      return false;
-    }
-    GroupByTimeComponent other = (GroupByTimeComponent) obj;
-    return this.startTime == other.startTime
-        && this.endTime == other.endTime
-        && this.interval == other.interval
-        && this.slidingStep == other.slidingStep
-        && this.isSlidingStepByMonth == other.isSlidingStepByMonth
-        && this.isIntervalByMonth == other.isIntervalByMonth
-        && this.leftCRightO == other.leftCRightO;
-  }
-
-  public int hashCode() {
-    return Objects.hash(
-        startTime,
-        endTime,
-        interval,
-        slidingStep,
-        isIntervalByMonth,
-        isSlidingStepByMonth,
-        leftCRightO);
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index 8712dd5641..35da48e4b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
@@ -80,6 +81,8 @@ public class FunctionExpression extends Expression {
    */
   private List<Expression> expressions;
 
+  private List<InputLocation> inputLocations;
+
   private List<PartialPath> paths;
 
   private String parametersString;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index 4e9cfb5e11..2585107554 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.Expression;
@@ -49,7 +50,9 @@ import java.util.Set;
 
 public class TimeSeriesOperand extends Expression {
 
-  protected PartialPath path;
+  private PartialPath path;
+
+  private InputLocation inputLocation;
 
   public TimeSeriesOperand(PartialPath path) {
     this.path = path;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
index 9b93d163ce..7596bde6d9 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
@@ -35,8 +35,8 @@ import org.apache.iotdb.db.mpp.operator.process.LimitOperator;
 import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
 import org.apache.iotdb.db.mpp.operator.process.merge.SingleColumnMerger;
 import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
-import org.apache.iotdb.db.mpp.sql.planner.plan.InputLocation;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
index 282c2895b5..b8591c8fdb 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
@@ -33,8 +33,8 @@ import org.apache.iotdb.db.mpp.operator.process.LimitOperator;
 import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
 import org.apache.iotdb.db.mpp.operator.process.merge.SingleColumnMerger;
 import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
-import org.apache.iotdb.db.mpp.sql.planner.plan.InputLocation;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
index d2045e0d16..09d56b7211 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
@@ -33,7 +33,7 @@ import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.operator.source.SeriesAggregateScanOperator;
 import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.sql.statement.component.GroupByTimeComponent;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
@@ -214,7 +214,7 @@ public class SeriesAggregateScanOperatorTest {
   @Test
   public void testGroupByWithoutGlobalTimeFilter() throws IllegalPathException {
     int[] result = new int[] {100, 100, 100, 100};
-    GroupByTimeComponent groupByTimeParameter = new GroupByTimeComponent(0, 399, 100, 100, true);
+    GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
         initSeriesAggregateScanOperator(
             Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter);
@@ -232,7 +232,7 @@ public class SeriesAggregateScanOperatorTest {
   public void testGroupByWithGlobalTimeFilter() throws IllegalPathException {
     int[] result = new int[] {0, 80, 100, 80};
     Filter timeFilter = new AndFilter(TimeFilter.gtEq(120), TimeFilter.ltEq(379));
-    GroupByTimeComponent groupByTimeParameter = new GroupByTimeComponent(0, 399, 100, 100, true);
+    GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
         initSeriesAggregateScanOperator(
             Collections.singletonList(AggregationType.COUNT),
@@ -263,7 +263,7 @@ public class SeriesAggregateScanOperatorTest {
     aggregationTypes.add(AggregationType.LAST_VALUE);
     aggregationTypes.add(AggregationType.MAX_VALUE);
     aggregationTypes.add(AggregationType.MIN_VALUE);
-    GroupByTimeComponent groupByTimeParameter = new GroupByTimeComponent(0, 399, 100, 100, true);
+    GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
         initSeriesAggregateScanOperator(aggregationTypes, null, true, groupByTimeParameter);
     int count = 0;
@@ -282,7 +282,7 @@ public class SeriesAggregateScanOperatorTest {
   @Test
   public void testGroupBySlidingTimeWindow() throws IllegalPathException {
     int[] result = new int[] {50, 50, 50, 50, 50, 50, 50, 50};
-    GroupByTimeComponent groupByTimeParameter = new GroupByTimeComponent(0, 399, 100, 50, true);
+    GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 50, true);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
         initSeriesAggregateScanOperator(
             Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter);
@@ -300,7 +300,7 @@ public class SeriesAggregateScanOperatorTest {
   public void testGroupBySlidingTimeWindow2() throws IllegalPathException {
     int[] timeColumn = new int[] {0, 20, 30, 50, 60, 80, 90, 110, 120, 140};
     int[] result = new int[] {20, 10, 20, 10, 20, 10, 20, 10, 20, 9};
-    GroupByTimeComponent groupByTimeParameter = new GroupByTimeComponent(0, 149, 50, 30, true);
+    GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 149, 50, 30, true);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
         initSeriesAggregateScanOperator(
             Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter);
@@ -329,7 +329,7 @@ public class SeriesAggregateScanOperatorTest {
     aggregationTypes.add(AggregationType.LAST_VALUE);
     aggregationTypes.add(AggregationType.MAX_VALUE);
     aggregationTypes.add(AggregationType.MIN_VALUE);
-    GroupByTimeComponent groupByTimeParameter = new GroupByTimeComponent(0, 149, 50, 30, true);
+    GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 149, 50, 30, true);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
         initSeriesAggregateScanOperator(aggregationTypes, null, true, groupByTimeParameter);
     int count = 0;
@@ -349,7 +349,7 @@ public class SeriesAggregateScanOperatorTest {
       List<AggregationType> aggregateFuncList,
       Filter timeFilter,
       boolean ascending,
-      GroupByTimeComponent groupByTimeParameter)
+      GroupByTimeParameter groupByTimeParameter)
       throws IllegalPathException {
     MeasurementPath measurementPath =
         new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SingleColumnMergerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SingleColumnMergerTest.java
index 706ba38a93..9cd3b4bd86 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SingleColumnMergerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SingleColumnMergerTest.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.mpp.operator;
 
 import org.apache.iotdb.db.mpp.operator.process.merge.SingleColumnMerger;
-import org.apache.iotdb.db.mpp.sql.planner.plan.InputLocation;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
index a4b8c3bcc3..276555839e 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
@@ -32,8 +32,8 @@ import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.operator.process.TimeJoinOperator;
 import org.apache.iotdb.db.mpp.operator.process.merge.SingleColumnMerger;
 import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
-import org.apache.iotdb.db.mpp.sql.planner.plan.InputLocation;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
index 5bf598cf14..ddc4945f87 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.FilterNullParameter;
 import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
@@ -120,7 +121,10 @@ public class FragmentInstanceSerdeTest {
 
     FilterNullNode filterNullNode =
         new FilterNullNode(
-            new PlanNodeId("TestFilterNullNode"), null, FilterNullPolicy.ALL_NULL, null);
+            new PlanNodeId("TestFilterNullNode"),
+            null,
+            FilterNullPolicy.ALL_NULL,
+            new ArrayList<>());
     IExpression expression =
         BinaryExpression.and(
             new SingleSeriesExpression(
@@ -136,7 +140,8 @@ public class FragmentInstanceSerdeTest {
 
     TimeJoinNode timeJoinNode =
         new TimeJoinNode(new PlanNodeId("TimeJoinNode"), OrderBy.TIMESTAMP_DESC);
-    timeJoinNode.setWithoutPolicy(FilterNullPolicy.CONTAINS_NULL);
+    timeJoinNode.setFilterNullParameter(
+        new FilterNullParameter(FilterNullPolicy.CONTAINS_NULL, new ArrayList<>()));
     SeriesScanNode seriesScanNode1 =
         new SeriesScanNode(new PlanNodeId("SeriesScanNode1"), new MeasurementPath("root.sg.d1.s2"));
     seriesScanNode1.setRegionReplicaSet(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/DeviceMergeNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/DeviceMergeNodeSerdeTest.java
index dc8646a993..fcf3221a9f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/DeviceMergeNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/DeviceMergeNodeSerdeTest.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -67,8 +66,6 @@ public class DeviceMergeNodeSerdeTest {
     DeviceMergeNode deviceMergeNode =
         new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
 
-    FilterNullComponent filterNullComponent = new FilterNullComponent();
-    deviceMergeNode.setFilterNullComponent(filterNullComponent);
     deviceMergeNode.addChildDeviceNode("device", aggregateNode);
 
     aggregateFuncMap = new HashMap<>();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/ExchangeNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/ExchangeNodeSerdeTest.java
index 76563e29b9..ae25b60799 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/ExchangeNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/ExchangeNodeSerdeTest.java
@@ -35,7 +35,6 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -73,8 +72,6 @@ public class ExchangeNodeSerdeTest {
     DeviceMergeNode deviceMergeNode =
         new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
 
-    FilterNullComponent filterNullComponent = new FilterNullComponent();
-    deviceMergeNode.setFilterNullComponent(filterNullComponent);
     deviceMergeNode.addChildDeviceNode("device", aggregateNode);
 
     aggregateFuncMap = new HashMap<>();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FillNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FillNodeSerdeTest.java
index df3a0f3d95..55a4ebc2fd 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FillNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FillNodeSerdeTest.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.DeviceMergeNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FillNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -56,9 +55,6 @@ public class FillNodeSerdeTest {
     DeviceMergeNode deviceMergeNode =
         new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
 
-    FilterNullComponent filterNullComponent = new FilterNullComponent();
-    deviceMergeNode.setFilterNullComponent(filterNullComponent);
-
     Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
     Set<AggregationType> aggregationTypes = new HashSet<>();
     aggregationTypes.add(AggregationType.MAX_TIME);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FilterNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FilterNodeSerdeTest.java
index b33fe9088b..686f8ce8bc 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FilterNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FilterNodeSerdeTest.java
@@ -33,7 +33,6 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FillNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -67,9 +66,6 @@ public class FilterNodeSerdeTest {
     DeviceMergeNode deviceMergeNode =
         new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
 
-    FilterNullComponent filterNullComponent = new FilterNullComponent();
-    deviceMergeNode.setFilterNullComponent(filterNullComponent);
-
     Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
     Set<AggregationType> aggregationTypes = new HashSet<>();
     aggregationTypes.add(AggregationType.MAX_TIME);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FilterNullNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FilterNullNodeSerdeTest.java
index f34d1ae761..ae8650840d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FilterNullNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/FilterNullNodeSerdeTest.java
@@ -34,7 +34,6 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNullNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
 import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
@@ -69,9 +68,6 @@ public class FilterNullNodeSerdeTest {
     DeviceMergeNode deviceMergeNode =
         new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
 
-    FilterNullComponent filterNullComponent = new FilterNullComponent();
-    deviceMergeNode.setFilterNullComponent(filterNullComponent);
-
     Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
     Set<AggregationType> aggregationTypes = new HashSet<>();
     aggregationTypes.add(AggregationType.MAX_TIME);
@@ -104,7 +100,10 @@ public class FilterNullNodeSerdeTest {
 
     FilterNullNode filterNullNode =
         new FilterNullNode(
-            new PlanNodeId("TestFilterNullNode"), filterNode, FilterNullPolicy.ALL_NULL, null);
+            new PlanNodeId("TestFilterNullNode"),
+            filterNode,
+            FilterNullPolicy.ALL_NULL,
+            new ArrayList<>());
     ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
     filterNullNode.serialize(byteBuffer);
     byteBuffer.flip();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/GroupByLevelNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/GroupByLevelNodeSerdeTest.java
index 8f4dc9a71a..8a5a61e82f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/GroupByLevelNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/GroupByLevelNodeSerdeTest.java
@@ -36,7 +36,6 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.FilterNullNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
 import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
@@ -71,9 +70,6 @@ public class GroupByLevelNodeSerdeTest {
     DeviceMergeNode deviceMergeNode =
         new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
 
-    FilterNullComponent filterNullComponent = new FilterNullComponent();
-    deviceMergeNode.setFilterNullComponent(filterNullComponent);
-
     Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
     Set<AggregationType> aggregationTypes = new HashSet<>();
     aggregationTypes.add(AggregationType.MAX_TIME);
@@ -106,7 +102,10 @@ public class GroupByLevelNodeSerdeTest {
 
     FilterNullNode filterNullNode =
         new FilterNullNode(
-            new PlanNodeId("TestFilterNullNode"), filterNode, FilterNullPolicy.ALL_NULL, null);
+            new PlanNodeId("TestFilterNullNode"),
+            filterNode,
+            FilterNullPolicy.ALL_NULL,
+            new ArrayList<>());
 
     Map<ColumnHeader, ColumnHeader> groupedPathMap = new HashMap<>();
     groupedPathMap.put(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/LimitNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/LimitNodeSerdeTest.java
index 2e6e84f4b1..3e1066dd86 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/LimitNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/LimitNodeSerdeTest.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
 import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
@@ -72,9 +71,6 @@ public class LimitNodeSerdeTest {
     DeviceMergeNode deviceMergeNode =
         new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
 
-    FilterNullComponent filterNullComponent = new FilterNullComponent();
-    deviceMergeNode.setFilterNullComponent(filterNullComponent);
-
     Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
     Set<AggregationType> aggregationTypes = new HashSet<>();
     aggregationTypes.add(AggregationType.MAX_TIME);
@@ -107,7 +103,10 @@ public class LimitNodeSerdeTest {
 
     FilterNullNode filterNullNode =
         new FilterNullNode(
-            new PlanNodeId("TestFilterNullNode"), filterNode, FilterNullPolicy.ALL_NULL, null);
+            new PlanNodeId("TestFilterNullNode"),
+            filterNode,
+            FilterNullPolicy.ALL_NULL,
+            new ArrayList<>());
 
     Map<ColumnHeader, ColumnHeader> groupedPathMap = new HashMap<>();
     groupedPathMap.put(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/OffsetNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/OffsetNodeSerdeTest.java
index 5cfa8dc015..b15e8705af 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/OffsetNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/OffsetNodeSerdeTest.java
@@ -41,8 +41,8 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.FilterNullParameter;
 import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
 import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
@@ -51,6 +51,7 @@ import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
 import org.apache.iotdb.tsfile.read.filter.operator.Regexp;
 
+import com.google.common.collect.ImmutableList;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
@@ -78,9 +79,6 @@ public class OffsetNodeSerdeTest {
     DeviceMergeNode deviceMergeNode =
         new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
 
-    FilterNullComponent filterNullComponent = new FilterNullComponent();
-    deviceMergeNode.setFilterNullComponent(filterNullComponent);
-
     Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
     Set<AggregationType> aggregationTypes = new HashSet<>();
     aggregationTypes.add(AggregationType.MAX_TIME);
@@ -113,7 +111,10 @@ public class OffsetNodeSerdeTest {
 
     FilterNullNode filterNullNode =
         new FilterNullNode(
-            new PlanNodeId("TestFilterNullNode"), filterNode, FilterNullPolicy.ALL_NULL, null);
+            new PlanNodeId("TestFilterNullNode"),
+            filterNode,
+            FilterNullPolicy.ALL_NULL,
+            new ArrayList<>());
 
     Map<ColumnHeader, ColumnHeader> groupedPathMap = new HashMap<>();
     groupedPathMap.put(
@@ -141,7 +142,8 @@ public class OffsetNodeSerdeTest {
     OffsetNode offsetNode = new OffsetNode(new PlanNodeId("OffsetNode"), 100);
     LimitNode limitNode = new LimitNode(new PlanNodeId("LimitNode"), 100);
     FilterNullNode filterNullNode =
-        new FilterNullNode(new PlanNodeId("FilterNullNode"), FilterNullPolicy.ALL_NULL, null);
+        new FilterNullNode(
+            new PlanNodeId("FilterNullNode"), FilterNullPolicy.ALL_NULL, ImmutableList.of());
     QueryFilter queryFilter = new QueryFilter(FilterType.KW_AND);
     BasicFunctionFilter leftQueryFilter =
         new BasicFunctionFilter(FilterType.GREATERTHAN, new MeasurementPath("root.sg.d1.s2"), "10");
@@ -159,7 +161,8 @@ public class OffsetNodeSerdeTest {
 
     TimeJoinNode timeJoinNode =
         new TimeJoinNode(new PlanNodeId("TimeJoinNode"), OrderBy.TIMESTAMP_DESC);
-    timeJoinNode.setWithoutPolicy(FilterNullPolicy.CONTAINS_NULL);
+    timeJoinNode.setFilterNullParameter(
+        new FilterNullParameter(FilterNullPolicy.CONTAINS_NULL, new ArrayList<>()));
     SeriesScanNode seriesScanNode1 =
         new SeriesScanNode(new PlanNodeId("SeriesScanNode1"), new MeasurementPath("root.sg.d1.s2"));
     seriesScanNode1.setRegionReplicaSet(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/SortNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/SortNodeSerdeTest.java
index 98f733b41d..9f914493bc 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/SortNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/SortNodeSerdeTest.java
@@ -39,7 +39,6 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
 import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
@@ -76,9 +75,6 @@ public class SortNodeSerdeTest {
     DeviceMergeNode deviceMergeNode =
         new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
 
-    FilterNullComponent filterNullComponent = new FilterNullComponent();
-    deviceMergeNode.setFilterNullComponent(filterNullComponent);
-
     Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
     Set<AggregationType> aggregationTypes = new HashSet<>();
     aggregationTypes.add(AggregationType.MAX_TIME);
@@ -111,7 +107,10 @@ public class SortNodeSerdeTest {
 
     FilterNullNode filterNullNode =
         new FilterNullNode(
-            new PlanNodeId("TestFilterNullNode"), filterNode, FilterNullPolicy.ALL_NULL, null);
+            new PlanNodeId("TestFilterNullNode"),
+            filterNode,
+            FilterNullPolicy.ALL_NULL,
+            new ArrayList<>());
 
     Map<ColumnHeader, ColumnHeader> groupedPathMap = new HashMap<>();
     groupedPathMap.put(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/TimeJoinNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/TimeJoinNodeSerdeTest.java
index c98ddb2ef2..5f752c1061 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/TimeJoinNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/process/TimeJoinNodeSerdeTest.java
@@ -39,8 +39,8 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.FilterNullParameter;
 import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
 import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
@@ -76,9 +76,6 @@ public class TimeJoinNodeSerdeTest {
     DeviceMergeNode deviceMergeNode =
         new DeviceMergeNode(new PlanNodeId("TestDeviceMergeNode"), OrderBy.TIMESTAMP_ASC);
 
-    FilterNullComponent filterNullComponent = new FilterNullComponent();
-    deviceMergeNode.setFilterNullComponent(filterNullComponent);
-
     Map<PartialPath, Set<AggregationType>> aggregateFuncMap = new HashMap<>();
     Set<AggregationType> aggregationTypes = new HashSet<>();
     aggregationTypes.add(AggregationType.MAX_TIME);
@@ -112,7 +109,10 @@ public class TimeJoinNodeSerdeTest {
 
     FilterNullNode filterNullNode =
         new FilterNullNode(
-            new PlanNodeId("TestFilterNullNode"), filterNode, FilterNullPolicy.ALL_NULL, null);
+            new PlanNodeId("TestFilterNullNode"),
+            filterNode,
+            FilterNullPolicy.ALL_NULL,
+            new ArrayList<>());
 
     Map<ColumnHeader, ColumnHeader> groupedPathMap = new HashMap<>();
     groupedPathMap.put(
@@ -136,7 +136,8 @@ public class TimeJoinNodeSerdeTest {
 
     TimeJoinNode timeJoinNode =
         new TimeJoinNode(new PlanNodeId("TestTimeJoinNode"), OrderBy.TIMESTAMP_ASC);
-    timeJoinNode.setWithoutPolicy(FilterNullPolicy.CONTAINS_NULL);
+    timeJoinNode.setFilterNullParameter(
+        new FilterNullParameter(FilterNullPolicy.CONTAINS_NULL, new ArrayList<>()));
     timeJoinNode.addChild(sortNode);
     ByteBuffer byteBuffer = ByteBuffer.allocate(2048);
     timeJoinNode.serialize(byteBuffer);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/source/SeriesAggregateScanNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/source/SeriesAggregateScanNodeSerdeTest.java
index 258968d6da..7271a366fa 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/source/SeriesAggregateScanNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/source/SeriesAggregateScanNodeSerdeTest.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.mpp.sql.plan.node.PlanNodeDeserializeHelper;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
-import org.apache.iotdb.db.mpp.sql.statement.component.GroupByTimeComponent;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -53,8 +53,8 @@ public class SeriesAggregateScanNodeSerdeTest {
     st.add("s2");
     List<AggregationType> aggregateFuncList = new ArrayList<>();
     aggregateFuncList.add(AggregationType.MAX_TIME);
-    GroupByTimeComponent groupByTimeComponent =
-        new GroupByTimeComponent(1, 100, 1, 1, true, true, true);
+    GroupByTimeParameter groupByTimeComponent =
+        new GroupByTimeParameter(1, 100, 1, 1, true, true, true);
     SeriesAggregateScanNode seriesAggregateScanNode =
         new SeriesAggregateScanNode(
             new PlanNodeId("TestSeriesAggregateScanNode"),