You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/10/19 11:13:04 UTC

[iotdb] 03/12: add IntoNode & DeviceViewNode

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9ce4eacfcbf806a44f93830df73e3f377043efea
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Oct 17 22:20:17 2022 +0800

    add IntoNode & DeviceViewNode
---
 .../planner/distribution/ExchangeNodeAdder.java    |  6 +-
 .../plan/planner/distribution/SourceRewriter.java  | 11 +--
 .../mpp/plan/planner/plan/node/PlanNodeType.java   |  4 +-
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java | 10 +++
 .../metedata/read/SchemaQueryOrderByHeatNode.java  |  4 +-
 .../planner/plan/node/process/AggregationNode.java | 23 ++---
 .../planner/plan/node/process/DeviceMergeNode.java | 32 +------
 .../{LimitNode.java => DeviceViewIntoNode.java}    | 97 +++++++++-------------
 .../planner/plan/node/process/DeviceViewNode.java  | 19 +----
 .../planner/plan/node/process/ExchangeNode.java    | 42 +---------
 .../plan/planner/plan/node/process/FillNode.java   | 46 ++--------
 .../plan/node/process/GroupByLevelNode.java        | 29 +++----
 .../planner/plan/node/process/GroupByTagNode.java  | 17 +---
 .../process/{OffsetNode.java => IntoNode.java}     | 91 +++++++++-----------
 .../plan/planner/plan/node/process/LimitNode.java  | 37 ++-------
 ...tiChildNode.java => MultiChildProcessNode.java} | 23 ++++-
 .../plan/planner/plan/node/process/OffsetNode.java | 36 ++------
 .../planner/plan/node/process/ProjectNode.java     | 40 +++------
 ...iChildNode.java => SingleChildProcessNode.java} | 50 ++++++++---
 .../node/process/SlidingWindowAggregationNode.java | 36 ++------
 .../plan/planner/plan/node/process/SortNode.java   | 29 ++-----
 .../planner/plan/node/process/TimeJoinNode.java    | 17 +---
 .../planner/plan/node/process/TransformNode.java   | 29 +------
 .../node/process/last/LastQueryCollectNode.java    |  4 +-
 .../plan/node/process/last/LastQueryMergeNode.java |  4 +-
 .../plan/node/process/last/LastQueryNode.java      |  4 +-
 26 files changed, 237 insertions(+), 503 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
index 37c487b4ab..60c0a6a9b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -38,7 +38,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
@@ -241,8 +241,8 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
     return processMultiChildNode(node, context);
   }
 
-  private PlanNode processMultiChildNode(MultiChildNode node, NodeGroupContext context) {
-    MultiChildNode newNode = (MultiChildNode) node.clone();
+  private PlanNode processMultiChildNode(MultiChildProcessNode node, NodeGroupContext context) {
+    MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone();
     List<PlanNode> visitedChildren = new ArrayList<>();
     node.getChildren()
         .forEach(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 8df3a16804..e6bc4891f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -38,7 +38,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
@@ -254,7 +254,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
   }
 
   private PlanNode processRawSeriesScan(
-      SeriesSourceNode node, DistributionPlanContext context, MultiChildNode parent) {
+      SeriesSourceNode node, DistributionPlanContext context, MultiChildProcessNode parent) {
     List<SeriesSourceNode> sourceNodes = splitSeriesSourceNodeByPartition(node, context);
     if (sourceNodes.size() == 1) {
       return sourceNodes.get(0);
@@ -407,8 +407,9 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
     return processRawMultiChildNode(node, context);
   }
 
-  private PlanNode processRawMultiChildNode(MultiChildNode node, DistributionPlanContext context) {
-    MultiChildNode root = (MultiChildNode) node.clone();
+  private PlanNode processRawMultiChildNode(
+      MultiChildProcessNode node, DistributionPlanContext context) {
+    MultiChildProcessNode root = (MultiChildProcessNode) node.clone();
     // Step 1: Get all source nodes. For the node which is not source, add it as the child of
     // current TimeJoinNode
     List<SourceNode> sources = new ArrayList<>();
@@ -468,7 +469,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
             } else {
               // We clone a TimeJoinNode from root to make the params to be consistent.
               // But we need to assign a new ID to it
-              MultiChildNode parentOfGroup = (MultiChildNode) root.clone();
+              MultiChildProcessNode parentOfGroup = (MultiChildProcessNode) root.clone();
               parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
               seriesScanNodes.forEach(parentOfGroup::addChild);
               root.addChild(parentOfGroup);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index dd102898d5..8b7804bfd6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -140,7 +140,9 @@ public enum PlanNodeType {
   LOAD_TSFILE((short) 55),
   CONSTRUCT_SCHEMA_BLACK_LIST_NODE((short) 56),
   ROLLBACK_SCHEMA_BLACK_LIST_NODE((short) 57),
-  GROUP_BY_TAG((short) 58);
+  GROUP_BY_TAG((short) 58),
+  INTO((short) 59),
+  DEVICE_VIEW_INTO((short) 60);
 
   public static final int BYTES = Short.BYTES;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 8b4155cb53..cad20d65de 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -44,12 +44,14 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCre
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.RollbackSchemaBlackListNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewIntoNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.IntoNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ProjectNode;
@@ -301,4 +303,12 @@ public abstract class PlanVisitor<R, C> {
   public R visitActivateTemplate(ActivateTemplateNode node, C context) {
     return visitPlan(node, context);
   }
+
+  public R visitInto(IntoNode node, C context) {
+    return visitPlan(node, context);
+  }
+
+  public R visitDeviceViewInto(DeviceViewIntoNode node, C context) {
+    return visitPlan(node, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryOrderByHeatNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryOrderByHeatNode.java
index 68cdf909cd..0c3d78ce6f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryOrderByHeatNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryOrderByHeatNode.java
@@ -23,14 +23,14 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
-public class SchemaQueryOrderByHeatNode extends MultiChildNode {
+public class SchemaQueryOrderByHeatNode extends MultiChildProcessNode {
 
   public SchemaQueryOrderByHeatNode(PlanNodeId id) {
     super(id);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
index 837be95747..4448ca0c5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
@@ -44,7 +44,7 @@ import java.util.stream.Collectors;
  * input as a TsBlock, it may be raw data or partial aggregation result. This node will output the
  * final series aggregated result represented by TsBlock.
  */
-public class AggregationNode extends MultiChildNode {
+public class AggregationNode extends MultiChildProcessNode {
 
   // The list of aggregate functions, each AggregateDescriptor will be output as one or two column
   // of
@@ -74,8 +74,10 @@ public class AggregationNode extends MultiChildNode {
       List<AggregationDescriptor> aggregationDescriptorList,
       @Nullable GroupByTimeParameter groupByTimeParameter,
       Ordering scanOrder) {
-    this(id, aggregationDescriptorList, groupByTimeParameter, scanOrder);
-    this.children = children;
+    super(id, children);
+    this.aggregationDescriptorList = getDeduplicatedDescriptors(aggregationDescriptorList);
+    this.groupByTimeParameter = groupByTimeParameter;
+    this.scanOrder = scanOrder;
   }
 
   public List<AggregationDescriptor> getAggregationDescriptorList() {
@@ -91,21 +93,6 @@ public class AggregationNode extends MultiChildNode {
     return scanOrder;
   }
 
-  @Override
-  public List<PlanNode> getChildren() {
-    return children;
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    this.children.add(child);
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return CHILD_COUNT_NO_LIMIT;
-  }
-
   @Override
   public PlanNode clone() {
     return new AggregationNode(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
index 141530d3f4..d3ec660e7f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java
@@ -34,7 +34,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
-public class DeviceMergeNode extends MultiChildNode {
+public class DeviceMergeNode extends MultiChildProcessNode {
 
   // The result output order, which could sort by device and time.
   // The size of this list is 2 and the first SortItem in this list has higher priority.
@@ -43,16 +43,6 @@ public class DeviceMergeNode extends MultiChildNode {
   // the list of selected devices
   private final List<String> devices;
 
-  public DeviceMergeNode(
-      PlanNodeId id,
-      List<PlanNode> children,
-      OrderByParameter mergeOrderParameter,
-      List<String> devices) {
-    super(id, children);
-    this.mergeOrderParameter = mergeOrderParameter;
-    this.devices = devices;
-  }
-
   public DeviceMergeNode(
       PlanNodeId id, OrderByParameter mergeOrderParameter, List<String> devices) {
     super(id);
@@ -68,21 +58,6 @@ public class DeviceMergeNode extends MultiChildNode {
     return devices;
   }
 
-  @Override
-  public List<PlanNode> getChildren() {
-    return children;
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    this.children.add(child);
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return CHILD_COUNT_NO_LIMIT;
-  }
-
   @Override
   public PlanNode clone() {
     return new DeviceMergeNode(getPlanNodeId(), getMergeOrderParameter(), getDevices());
@@ -147,13 +122,12 @@ public class DeviceMergeNode extends MultiChildNode {
     }
     DeviceMergeNode that = (DeviceMergeNode) o;
     return Objects.equals(mergeOrderParameter, that.mergeOrderParameter)
-        && Objects.equals(devices, that.devices)
-        && Objects.equals(children, that.children);
+        && Objects.equals(devices, that.devices);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), mergeOrderParameter, devices, children);
+    return Objects.hash(super.hashCode(), mergeOrderParameter, devices);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java
similarity index 51%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java
index 9d3badac0e..987cbc18d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewIntoNode.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,102 +16,74 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
 
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-import com.google.common.collect.ImmutableList;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
-/** LimitNode is used to select top n result. It uses the default order of upstream nodes */
-public class LimitNode extends ProcessNode {
+public class DeviceViewIntoNode extends SingleChildProcessNode {
 
-  private final int limit;
+  private final DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor;
 
-  private PlanNode child;
-
-  public LimitNode(PlanNodeId id, int limit) {
+  public DeviceViewIntoNode(
+      PlanNodeId id, DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor) {
     super(id);
-    this.limit = limit;
-  }
-
-  public LimitNode(PlanNodeId id, PlanNode child, int limit) {
-    this(id, limit);
-    this.child = child;
-  }
-
-  public int getLimit() {
-    return limit;
-  }
-
-  public PlanNode getChild() {
-    return child;
-  }
-
-  public void setChild(PlanNode child) {
-    this.child = child;
-  }
-
-  @Override
-  public List<PlanNode> getChildren() {
-    return ImmutableList.of(child);
+    this.deviceViewIntoPathDescriptor = deviceViewIntoPathDescriptor;
   }
 
-  @Override
-  public void addChild(PlanNode child) {
-    this.child = child;
+  public DeviceViewIntoNode(
+      PlanNodeId id, PlanNode child, DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor) {
+    super(id, child);
+    this.deviceViewIntoPathDescriptor = deviceViewIntoPathDescriptor;
   }
 
   @Override
   public PlanNode clone() {
-    return new LimitNode(getPlanNodeId(), this.limit);
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return ONE_CHILD;
+    return new DeviceViewIntoNode(getPlanNodeId(), this.deviceViewIntoPathDescriptor);
   }
 
   @Override
   public List<String> getOutputColumnNames() {
-    return child.getOutputColumnNames();
-  }
-
-  @Override
-  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
-    return visitor.visitLimit(this, context);
+    return ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders.stream()
+        .map(ColumnHeader::getColumnName)
+        .collect(Collectors.toList());
   }
 
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
-    PlanNodeType.LIMIT.serialize(byteBuffer);
-    ReadWriteIOUtils.write(limit, byteBuffer);
+    PlanNodeType.INTO.serialize(byteBuffer);
+    this.deviceViewIntoPathDescriptor.serialize(byteBuffer);
   }
 
   @Override
   protected void serializeAttributes(DataOutputStream stream) throws IOException {
-    PlanNodeType.LIMIT.serialize(stream);
-    ReadWriteIOUtils.write(limit, stream);
+    PlanNodeType.INTO.serialize(stream);
+    this.deviceViewIntoPathDescriptor.serialize(stream);
   }
 
-  public static LimitNode deserialize(ByteBuffer byteBuffer) {
-    int limit = ReadWriteIOUtils.readInt(byteBuffer);
+  public static DeviceViewIntoNode deserialize(ByteBuffer byteBuffer) {
+    DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor =
+        DeviceViewIntoPathDescriptor.deserialize(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new LimitNode(planNodeId, limit);
+    return new DeviceViewIntoNode(planNodeId, deviceViewIntoPathDescriptor);
   }
 
   @Override
-  public String toString() {
-    return "LimitNode-" + this.getPlanNodeId();
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitDeviceViewInto(this, context);
   }
 
   @Override
@@ -125,12 +97,17 @@ public class LimitNode extends ProcessNode {
     if (!super.equals(o)) {
       return false;
     }
-    LimitNode that = (LimitNode) o;
-    return limit == that.limit && child.equals(that.child);
+    DeviceViewIntoNode intoNode = (DeviceViewIntoNode) o;
+    return deviceViewIntoPathDescriptor.equals(intoNode.deviceViewIntoPathDescriptor);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), limit, child);
+    return Objects.hash(super.hashCode(), deviceViewIntoPathDescriptor);
+  }
+
+  @Override
+  public String toString() {
+    return "DeviceViewIntoNode-" + getPlanNodeId();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
index 034db109de..8517904d21 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceViewNode.java
@@ -42,7 +42,7 @@ import java.util.Objects;
  * same between these TsBlocks. If the input TsBlock contains n columns, the device-based view will
  * contain n+1 columns where the new column is Device column.
  */
-public class DeviceViewNode extends MultiChildNode {
+public class DeviceViewNode extends MultiChildProcessNode {
 
   // The result output order, which could sort by device and time.
   // The size of this list is 2 and the first SortItem in this list has higher priority.
@@ -95,21 +95,6 @@ public class DeviceViewNode extends MultiChildNode {
     return deviceToMeasurementIndexesMap;
   }
 
-  @Override
-  public List<PlanNode> getChildren() {
-    return children;
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    this.children.add(child);
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return CHILD_COUNT_NO_LIMIT;
-  }
-
   @Override
   public PlanNode clone() {
     return new DeviceViewNode(
@@ -224,7 +209,6 @@ public class DeviceViewNode extends MultiChildNode {
     DeviceViewNode that = (DeviceViewNode) o;
     return mergeOrderParameter.equals(that.mergeOrderParameter)
         && devices.equals(that.devices)
-        && children.equals(that.children)
         && outputColumnNames.equals(that.outputColumnNames)
         && deviceToMeasurementIndexesMap.equals(that.deviceToMeasurementIndexesMap);
   }
@@ -235,7 +219,6 @@ public class DeviceViewNode extends MultiChildNode {
         super.hashCode(),
         mergeOrderParameter,
         devices,
-        children,
         outputColumnNames,
         deviceToMeasurementIndexesMap);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
index 780a0e629f..0a4b64f3de 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ExchangeNode.java
@@ -28,8 +28,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import com.google.common.collect.ImmutableList;
-
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -37,8 +35,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
-public class ExchangeNode extends PlanNode {
-  private PlanNode child;
+public class ExchangeNode extends SingleChildProcessNode {
+
   // The remoteSourceNode is used to record the remote source info for current ExchangeNode
   // It is not the child of current ExchangeNode
   private FragmentSinkNode remoteSourceNode;
@@ -56,24 +54,11 @@ public class ExchangeNode extends PlanNode {
     super(id);
   }
 
-  @Override
-  public List<PlanNode> getChildren() {
-    if (this.child == null) {
-      return ImmutableList.of();
-    }
-    return ImmutableList.of(child);
-  }
-
   @Override
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitExchange(this, context);
   }
 
-  @Override
-  public void addChild(PlanNode child) {
-    this.child = child;
-  }
-
   @Override
   public PlanNode clone() {
     ExchangeNode node = new ExchangeNode(getPlanNodeId());
@@ -85,11 +70,6 @@ public class ExchangeNode extends PlanNode {
     return node;
   }
 
-  @Override
-  public int allowedChildCount() {
-    return CHILD_COUNT_NO_LIMIT;
-  }
-
   @Override
   public List<String> getOutputColumnNames() {
     return outputColumnNames;
@@ -150,14 +130,6 @@ public class ExchangeNode extends PlanNode {
     }
   }
 
-  public PlanNode getChild() {
-    return child;
-  }
-
-  public void setChild(PlanNode child) {
-    this.child = child;
-  }
-
   @Override
   public String toString() {
     return String.format(
@@ -182,10 +154,6 @@ public class ExchangeNode extends PlanNode {
     this.setOutputColumnNames(remoteSourceNode.getOutputColumnNames());
   }
 
-  public void cleanChildren() {
-    this.child = null;
-  }
-
   public TEndPoint getUpstreamEndpoint() {
     return upstreamEndpoint;
   }
@@ -210,15 +178,13 @@ public class ExchangeNode extends PlanNode {
       return false;
     }
     ExchangeNode that = (ExchangeNode) o;
-    return Objects.equals(child, that.child)
-        && Objects.equals(upstreamEndpoint, that.upstreamEndpoint)
+    return Objects.equals(upstreamEndpoint, that.upstreamEndpoint)
         && Objects.equals(upstreamInstanceId, that.upstreamInstanceId)
         && Objects.equals(upstreamPlanNodeId, that.upstreamPlanNodeId);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(
-        super.hashCode(), child, upstreamEndpoint, upstreamInstanceId, upstreamPlanNodeId);
+    return Objects.hash(super.hashCode(), upstreamEndpoint, upstreamInstanceId, upstreamPlanNodeId);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FillNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FillNode.java
index 5acb18a9de..d177d98d9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FillNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FillNode.java
@@ -26,8 +26,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import com.google.common.collect.ImmutableList;
-
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -35,48 +33,24 @@ import java.util.List;
 import java.util.Objects;
 
 /** FillNode is used to fill the empty field in one row. */
-public class FillNode extends ProcessNode {
+public class FillNode extends SingleChildProcessNode {
 
   // descriptions of how null values are filled
-  private FillDescriptor fillDescriptor;
-
-  private Ordering scanOrder;
-
-  private PlanNode child;
+  private final FillDescriptor fillDescriptor;
 
-  public FillNode(PlanNodeId id) {
-    super(id);
-  }
+  private final Ordering scanOrder;
 
   public FillNode(PlanNodeId id, FillDescriptor fillDescriptor, Ordering scanOrder) {
-    this(id);
+    super(id);
     this.fillDescriptor = fillDescriptor;
     this.scanOrder = scanOrder;
   }
 
   public FillNode(
       PlanNodeId id, PlanNode child, FillDescriptor fillDescriptor, Ordering scanOrder) {
-    this(id, fillDescriptor, scanOrder);
-    this.child = child;
-  }
-
-  @Override
-  public List<PlanNode> getChildren() {
-    return ImmutableList.of(child);
-  }
-
-  public PlanNode getChild() {
-    return child;
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    this.child = child;
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return ONE_CHILD;
+    super(id, child);
+    this.fillDescriptor = fillDescriptor;
+    this.scanOrder = scanOrder;
   }
 
   @Override
@@ -127,14 +101,12 @@ public class FillNode extends ProcessNode {
       return false;
     }
     FillNode that = (FillNode) o;
-    return Objects.equals(fillDescriptor, that.fillDescriptor)
-        && Objects.equals(child, that.child)
-        && scanOrder == that.scanOrder;
+    return Objects.equals(fillDescriptor, that.fillDescriptor) && scanOrder == that.scanOrder;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), fillDescriptor, child, scanOrder);
+    return Objects.hash(super.hashCode(), fillDescriptor, scanOrder);
   }
 
   public FillDescriptor getFillDescriptor() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
index 0fa39ff184..da7b8dc9c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
@@ -52,7 +52,7 @@ import java.util.stream.Collectors;
  * <p>If the group by level parameter is [0, 2], then these two columns will not belong to one
  * bucket. And the total buckets are `root.*.d1.s1` and `root.*.d2.s1`
  */
-public class GroupByLevelNode extends MultiChildNode {
+public class GroupByLevelNode extends MultiChildProcessNode {
 
   // The list of aggregate descriptors
   // each GroupByLevelDescriptor will be output as one or two column of result TsBlock
@@ -87,21 +87,6 @@ public class GroupByLevelNode extends MultiChildNode {
     this.scanOrder = scanOrder;
   }
 
-  @Override
-  public List<PlanNode> getChildren() {
-    return children;
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    this.children.add(child);
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return CHILD_COUNT_NO_LIMIT;
-  }
-
   @Override
   public PlanNode clone() {
     return new GroupByLevelNode(
@@ -191,9 +176,15 @@ public class GroupByLevelNode extends MultiChildNode {
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-    if (!super.equals(o)) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
     GroupByLevelNode that = (GroupByLevelNode) o;
     return Objects.equals(groupByLevelDescriptors, that.groupByLevelDescriptors)
         && Objects.equals(groupByTimeParameter, that.groupByTimeParameter)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
index e0ef548508..c2e558bdc7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
@@ -43,7 +43,7 @@ import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
-public class GroupByTagNode extends MultiChildNode {
+public class GroupByTagNode extends MultiChildProcessNode {
 
   private final List<String> tagKeys;
   private final Map<List<String>, List<CrossSeriesAggregationDescriptor>>
@@ -87,16 +87,6 @@ public class GroupByTagNode extends MultiChildNode {
     this.outputColumnNames = Validate.notNull(outputColumnNames);
   }
 
-  @Override
-  public List<PlanNode> getChildren() {
-    return children;
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    this.children.add(child);
-  }
-
   @Override
   public PlanNode clone() {
     // TODO: better do deep copy
@@ -109,11 +99,6 @@ public class GroupByTagNode extends MultiChildNode {
         this.outputColumnNames);
   }
 
-  @Override
-  public int allowedChildCount() {
-    return CHILD_COUNT_NO_LIMIT;
-  }
-
   @Override
   public List<String> getOutputColumnNames() {
     List<String> ret = new ArrayList<>(tagKeys);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java
index 0407f6d946..b68e862a7a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/IntoNode.java
@@ -16,96 +16,71 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
 
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-
-import com.google.common.collect.ImmutableList;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
-/**
- * OffsetNode is used to skip top n result from upstream nodes. It uses the default order of
- * upstream nodes
- */
-public class OffsetNode extends ProcessNode {
-
-  private final int offset;
+public class IntoNode extends SingleChildProcessNode {
 
-  private PlanNode child;
+  private final IntoPathDescriptor intoPathDescriptor;
 
-  public OffsetNode(PlanNodeId id, int offset) {
+  public IntoNode(PlanNodeId id, IntoPathDescriptor intoPathDescriptor) {
     super(id);
-    this.offset = offset;
+    this.intoPathDescriptor = intoPathDescriptor;
   }
 
-  public OffsetNode(PlanNodeId id, PlanNode child, int offset) {
-    this(id, offset);
-    this.child = child;
-  }
-
-  @Override
-  public List<PlanNode> getChildren() {
-    return ImmutableList.of(child);
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    this.child = child;
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return ONE_CHILD;
+  public IntoNode(PlanNodeId id, PlanNode child, IntoPathDescriptor intoPathDescriptor) {
+    super(id, child);
+    this.intoPathDescriptor = intoPathDescriptor;
   }
 
   @Override
   public PlanNode clone() {
-    return new OffsetNode(getPlanNodeId(), offset);
+    return new IntoNode(getPlanNodeId(), this.intoPathDescriptor);
   }
 
   @Override
   public List<String> getOutputColumnNames() {
-    return child.getOutputColumnNames();
-  }
-
-  @Override
-  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
-    return visitor.visitOffset(this, context);
+    return ColumnHeaderConstant.selectIntoColumnHeaders.stream()
+        .map(ColumnHeader::getColumnName)
+        .collect(Collectors.toList());
   }
 
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
-    PlanNodeType.OFFSET.serialize(byteBuffer);
-    ReadWriteIOUtils.write(offset, byteBuffer);
+    PlanNodeType.INTO.serialize(byteBuffer);
+    this.intoPathDescriptor.serialize(byteBuffer);
   }
 
   @Override
   protected void serializeAttributes(DataOutputStream stream) throws IOException {
-    PlanNodeType.OFFSET.serialize(stream);
-    ReadWriteIOUtils.write(offset, stream);
+    PlanNodeType.INTO.serialize(stream);
+    this.intoPathDescriptor.serialize(stream);
   }
 
-  public static OffsetNode deserialize(ByteBuffer byteBuffer) {
-    int offset = ReadWriteIOUtils.readInt(byteBuffer);
+  public static IntoNode deserialize(ByteBuffer byteBuffer) {
+    IntoPathDescriptor intoPathDescriptor = IntoPathDescriptor.deserialize(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new OffsetNode(planNodeId, offset);
+    return new IntoNode(planNodeId, intoPathDescriptor);
   }
 
-  public PlanNode getChild() {
-    return child;
-  }
-
-  public int getOffset() {
-    return offset;
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitInto(this, context);
   }
 
   @Override
@@ -116,12 +91,20 @@ public class OffsetNode extends ProcessNode {
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    OffsetNode that = (OffsetNode) o;
-    return offset == that.offset && child.equals(that.child);
+    if (!super.equals(o)) {
+      return false;
+    }
+    IntoNode intoNode = (IntoNode) o;
+    return intoPathDescriptor.equals(intoNode.intoPathDescriptor);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(child, offset);
+    return Objects.hash(super.hashCode(), intoPathDescriptor);
+  }
+
+  @Override
+  public String toString() {
+    return "IntoNode-" + getPlanNodeId();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java
index 9d3badac0e..ae03e13f70 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import com.google.common.collect.ImmutableList;
-
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -33,54 +31,29 @@ import java.util.List;
 import java.util.Objects;
 
 /** LimitNode is used to select top n result. It uses the default order of upstream nodes */
-public class LimitNode extends ProcessNode {
+public class LimitNode extends SingleChildProcessNode {
 
   private final int limit;
 
-  private PlanNode child;
-
   public LimitNode(PlanNodeId id, int limit) {
     super(id);
     this.limit = limit;
   }
 
   public LimitNode(PlanNodeId id, PlanNode child, int limit) {
-    this(id, limit);
-    this.child = child;
+    super(id, child);
+    this.limit = limit;
   }
 
   public int getLimit() {
     return limit;
   }
 
-  public PlanNode getChild() {
-    return child;
-  }
-
-  public void setChild(PlanNode child) {
-    this.child = child;
-  }
-
-  @Override
-  public List<PlanNode> getChildren() {
-    return ImmutableList.of(child);
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    this.child = child;
-  }
-
   @Override
   public PlanNode clone() {
     return new LimitNode(getPlanNodeId(), this.limit);
   }
 
-  @Override
-  public int allowedChildCount() {
-    return ONE_CHILD;
-  }
-
   @Override
   public List<String> getOutputColumnNames() {
     return child.getOutputColumnNames();
@@ -126,11 +99,11 @@ public class LimitNode extends ProcessNode {
       return false;
     }
     LimitNode that = (LimitNode) o;
-    return limit == that.limit && child.equals(that.child);
+    return limit == that.limit;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), limit, child);
+    return Objects.hash(super.hashCode(), limit);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildProcessNode.java
similarity index 77%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildProcessNode.java
index 9e699f370c..e23a86d0c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildProcessNode.java
@@ -26,16 +26,16 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
-public abstract class MultiChildNode extends ProcessNode {
+public abstract class MultiChildProcessNode extends ProcessNode {
 
   protected List<PlanNode> children;
 
-  public MultiChildNode(PlanNodeId id, List<PlanNode> children) {
+  public MultiChildProcessNode(PlanNodeId id, List<PlanNode> children) {
     super(id);
     this.children = children;
   }
 
-  public MultiChildNode(PlanNodeId id) {
+  public MultiChildProcessNode(PlanNodeId id) {
     super(id);
     this.children = new ArrayList<>();
   }
@@ -44,6 +44,21 @@ public abstract class MultiChildNode extends ProcessNode {
     this.children = children;
   }
 
+  @Override
+  public List<PlanNode> getChildren() {
+    return children;
+  }
+
+  @Override
+  public void addChild(PlanNode child) {
+    this.children.add(child);
+  }
+
+  @Override
+  public int allowedChildCount() {
+    return CHILD_COUNT_NO_LIMIT;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -55,7 +70,7 @@ public abstract class MultiChildNode extends ProcessNode {
     if (!super.equals(o)) {
       return false;
     }
-    MultiChildNode that = (MultiChildNode) o;
+    MultiChildProcessNode that = (MultiChildProcessNode) o;
     return children.equals(that.children);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java
index 0407f6d946..64b912303b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import com.google.common.collect.ImmutableList;
-
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -36,35 +34,18 @@ import java.util.Objects;
  * OffsetNode is used to skip top n result from upstream nodes. It uses the default order of
  * upstream nodes
  */
-public class OffsetNode extends ProcessNode {
+public class OffsetNode extends SingleChildProcessNode {
 
   private final int offset;
 
-  private PlanNode child;
-
   public OffsetNode(PlanNodeId id, int offset) {
     super(id);
     this.offset = offset;
   }
 
   public OffsetNode(PlanNodeId id, PlanNode child, int offset) {
-    this(id, offset);
-    this.child = child;
-  }
-
-  @Override
-  public List<PlanNode> getChildren() {
-    return ImmutableList.of(child);
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    this.child = child;
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return ONE_CHILD;
+    super(id, child);
+    this.offset = offset;
   }
 
   @Override
@@ -100,10 +81,6 @@ public class OffsetNode extends ProcessNode {
     return new OffsetNode(planNodeId, offset);
   }
 
-  public PlanNode getChild() {
-    return child;
-  }
-
   public int getOffset() {
     return offset;
   }
@@ -116,12 +93,15 @@ public class OffsetNode extends ProcessNode {
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
+    if (!super.equals(o)) {
+      return false;
+    }
     OffsetNode that = (OffsetNode) o;
-    return offset == that.offset && child.equals(that.child);
+    return offset == that.offset;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(child, offset);
+    return Objects.hash(super.hashCode(), offset);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ProjectNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ProjectNode.java
index 3a9b5a7268..d5e71b6257 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ProjectNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/ProjectNode.java
@@ -25,8 +25,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import com.google.common.collect.ImmutableList;
-
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -34,38 +32,20 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
-public class ProjectNode extends ProcessNode {
+public class ProjectNode extends SingleChildProcessNode {
 
   private final List<String> outputColumnNames;
 
-  private PlanNode child;
-
   public ProjectNode(PlanNodeId id, List<String> outputColumnNames) {
     super(id);
     this.outputColumnNames = outputColumnNames;
   }
 
   public ProjectNode(PlanNodeId id, PlanNode child, List<String> outputColumnNames) {
-    super(id);
-    this.child = child;
+    super(id, child);
     this.outputColumnNames = outputColumnNames;
   }
 
-  @Override
-  public List<PlanNode> getChildren() {
-    return ImmutableList.of(child);
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    this.child = child;
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return ONE_CHILD;
-  }
-
   @Override
   public PlanNode clone() {
     return new ProjectNode(getPlanNodeId(), getOutputColumnNames());
@@ -112,15 +92,21 @@ public class ProjectNode extends ProcessNode {
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-    if (!super.equals(o)) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
     ProjectNode that = (ProjectNode) o;
-    return outputColumnNames.equals(that.outputColumnNames) && child.equals(that.child);
+    return outputColumnNames.equals(that.outputColumnNames);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), outputColumnNames, child);
+    return Objects.hash(super.hashCode(), outputColumnNames);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SingleChildProcessNode.java
similarity index 59%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SingleChildProcessNode.java
index 9e699f370c..bd183cf3ee 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MultiChildNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SingleChildProcessNode.java
@@ -22,26 +22,52 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 
-import java.util.ArrayList;
+import com.google.common.collect.ImmutableList;
+
 import java.util.List;
 import java.util.Objects;
 
-public abstract class MultiChildNode extends ProcessNode {
+public abstract class SingleChildProcessNode extends ProcessNode {
 
-  protected List<PlanNode> children;
+  protected PlanNode child;
 
-  public MultiChildNode(PlanNodeId id, List<PlanNode> children) {
+  public SingleChildProcessNode(PlanNodeId id) {
     super(id);
-    this.children = children;
   }
 
-  public MultiChildNode(PlanNodeId id) {
+  public SingleChildProcessNode(PlanNodeId id, PlanNode child) {
     super(id);
-    this.children = new ArrayList<>();
+    this.child = child;
+  }
+
+  public PlanNode getChild() {
+    return child;
+  }
+
+  public void setChild(PlanNode child) {
+    this.child = child;
+  }
+
+  public void cleanChildren() {
+    this.child = null;
   }
 
-  public void setChildren(List<PlanNode> children) {
-    this.children = children;
+  @Override
+  public List<PlanNode> getChildren() {
+    if (this.child == null) {
+      return ImmutableList.of();
+    }
+    return ImmutableList.of(child);
+  }
+
+  @Override
+  public void addChild(PlanNode child) {
+    this.child = child;
+  }
+
+  @Override
+  public int allowedChildCount() {
+    return ONE_CHILD;
   }
 
   @Override
@@ -55,12 +81,12 @@ public abstract class MultiChildNode extends ProcessNode {
     if (!super.equals(o)) {
       return false;
     }
-    MultiChildNode that = (MultiChildNode) o;
-    return children.equals(that.children);
+    SingleChildProcessNode that = (SingleChildProcessNode) o;
+    return Objects.equals(child, that.child);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), children);
+    return Objects.hash(super.hashCode(), child);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java
index a2f4935791..49d470a780 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java
@@ -28,8 +28,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import com.google.common.collect.ImmutableList;
-
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -38,7 +36,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
-public class SlidingWindowAggregationNode extends ProcessNode {
+public class SlidingWindowAggregationNode extends SingleChildProcessNode {
 
   // The list of aggregate functions, each AggregateDescriptor will be output as one column of
   // result TsBlock
@@ -49,8 +47,6 @@ public class SlidingWindowAggregationNode extends ProcessNode {
 
   protected Ordering scanOrder;
 
-  private PlanNode child;
-
   public SlidingWindowAggregationNode(
       PlanNodeId id,
       List<AggregationDescriptor> aggregationDescriptorList,
@@ -68,8 +64,10 @@ public class SlidingWindowAggregationNode extends ProcessNode {
       List<AggregationDescriptor> aggregationDescriptorList,
       GroupByTimeParameter groupByTimeParameter,
       Ordering scanOrder) {
-    this(id, aggregationDescriptorList, groupByTimeParameter, scanOrder);
-    this.child = child;
+    super(id, child);
+    this.aggregationDescriptorList = aggregationDescriptorList;
+    this.groupByTimeParameter = groupByTimeParameter;
+    this.scanOrder = scanOrder;
   }
 
   public List<AggregationDescriptor> getAggregationDescriptorList() {
@@ -88,25 +86,6 @@ public class SlidingWindowAggregationNode extends ProcessNode {
     return scanOrder;
   }
 
-  public PlanNode getChild() {
-    return child;
-  }
-
-  @Override
-  public List<PlanNode> getChildren() {
-    return ImmutableList.of(child);
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    this.child = child;
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return ONE_CHILD;
-  }
-
   @Override
   public PlanNode clone() {
     return new SlidingWindowAggregationNode(
@@ -189,13 +168,12 @@ public class SlidingWindowAggregationNode extends ProcessNode {
     }
     SlidingWindowAggregationNode that = (SlidingWindowAggregationNode) o;
     return Objects.equals(aggregationDescriptorList, that.aggregationDescriptorList)
-        && Objects.equals(groupByTimeParameter, that.groupByTimeParameter)
-        && Objects.equals(child, that.child);
+        && Objects.equals(groupByTimeParameter, that.groupByTimeParameter);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), aggregationDescriptorList, groupByTimeParameter, child);
+    return Objects.hash(super.hashCode(), aggregationDescriptorList, groupByTimeParameter);
   }
 
   public String toString() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java
index 283987eb97..e033fd163d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SortNode.java
@@ -25,8 +25,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import com.google.common.collect.ImmutableList;
-
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -37,9 +35,7 @@ import java.util.Objects;
  * In general, the parameter in sortNode should be pushed down to the upstream operators. In our
  * optimized logical query plan, the sortNode should not appear.
  */
-public class SortNode extends ProcessNode {
-
-  private PlanNode child;
+public class SortNode extends SingleChildProcessNode {
 
   private final Ordering sortOrder;
 
@@ -49,29 +45,14 @@ public class SortNode extends ProcessNode {
   }
 
   public SortNode(PlanNodeId id, PlanNode child, Ordering sortOrder) {
-    this(id, sortOrder);
-    this.child = child;
+    super(id, child);
+    this.sortOrder = sortOrder;
   }
 
   public Ordering getSortOrder() {
     return sortOrder;
   }
 
-  @Override
-  public List<PlanNode> getChildren() {
-    return ImmutableList.of(child);
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    this.child = child;
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return ONE_CHILD;
-  }
-
   @Override
   public PlanNode clone() {
     return new SortNode(getPlanNodeId(), sortOrder);
@@ -117,11 +98,11 @@ public class SortNode extends ProcessNode {
       return false;
     }
     SortNode sortNode = (SortNode) o;
-    return child.equals(sortNode.child) && sortOrder == sortNode.sortOrder;
+    return sortOrder == sortNode.sortOrder;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), child, sortOrder);
+    return Objects.hash(super.hashCode(), sortOrder);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
index 5685e6d95b..2598cd4e28 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
@@ -38,7 +38,7 @@ import java.util.stream.Collectors;
  * timestamp column. It will join two or more TsBlock by Timestamp column. The output result of
  * TimeJoinOperator is sorted by timestamp
  */
-public class TimeJoinNode extends MultiChildNode {
+public class TimeJoinNode extends MultiChildProcessNode {
 
   // This parameter indicates the order when executing multiway merge sort.
   private final Ordering mergeOrder;
@@ -57,21 +57,6 @@ public class TimeJoinNode extends MultiChildNode {
     return mergeOrder;
   }
 
-  @Override
-  public List<PlanNode> getChildren() {
-    return children;
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    this.children.add(child);
-  }
-
-  @Override
-  public int allowedChildCount() {
-    return CHILD_COUNT_NO_LIMIT;
-  }
-
   @Override
   public PlanNode clone() {
     return new TimeJoinNode(getPlanNodeId(), getMergeOrder());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TransformNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TransformNode.java
index f6233e2e9d..112e53a01b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TransformNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TransformNode.java
@@ -27,8 +27,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import com.google.common.collect.ImmutableList;
-
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -38,9 +36,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 
-public class TransformNode extends ProcessNode {
-
-  protected PlanNode childPlanNode;
+public class TransformNode extends SingleChildProcessNode {
 
   protected final Expression[] outputExpressions;
   protected final boolean keepNull;
@@ -52,13 +48,12 @@ public class TransformNode extends ProcessNode {
 
   public TransformNode(
       PlanNodeId id,
-      PlanNode childPlanNode,
+      PlanNode child,
       Expression[] outputExpressions,
       boolean keepNull,
       ZoneId zoneId,
       Ordering scanOrder) {
-    super(id);
-    this.childPlanNode = childPlanNode;
+    super(id, child);
     this.outputExpressions = outputExpressions;
     this.keepNull = keepNull;
     this.zoneId = zoneId;
@@ -78,21 +73,6 @@ public class TransformNode extends ProcessNode {
     this.scanOrder = scanOrder;
   }
 
-  @Override
-  public final List<PlanNode> getChildren() {
-    return ImmutableList.of(childPlanNode);
-  }
-
-  @Override
-  public final void addChild(PlanNode childPlanNode) {
-    this.childPlanNode = childPlanNode;
-  }
-
-  @Override
-  public final int allowedChildCount() {
-    return ONE_CHILD;
-  }
-
   @Override
   public final List<String> getOutputColumnNames() {
     if (outputColumnNames == null) {
@@ -185,7 +165,6 @@ public class TransformNode extends ProcessNode {
     }
     TransformNode that = (TransformNode) o;
     return keepNull == that.keepNull
-        && childPlanNode.equals(that.childPlanNode)
         && Arrays.equals(outputExpressions, that.outputExpressions)
         && zoneId.equals(that.zoneId)
         && scanOrder == that.scanOrder;
@@ -193,7 +172,7 @@ public class TransformNode extends ProcessNode {
 
   @Override
   public int hashCode() {
-    int result = Objects.hash(super.hashCode(), childPlanNode, keepNull, zoneId, scanOrder);
+    int result = Objects.hash(super.hashCode(), keepNull, zoneId, scanOrder);
     result = 31 * result + Arrays.hashCode(outputExpressions);
     return result;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryCollectNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryCollectNode.java
index a81a5fb455..5f38f64a0a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryCollectNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryCollectNode.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -32,7 +32,7 @@ import java.util.Objects;
 
 import static org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS;
 
-public class LastQueryCollectNode extends MultiChildNode {
+public class LastQueryCollectNode extends MultiChildProcessNode {
 
   public LastQueryCollectNode(PlanNodeId id) {
     super(id);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryMergeNode.java
index 482fc0110d..accb4dcbd7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryMergeNode.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
 
 import java.io.DataOutputStream;
@@ -33,7 +33,7 @@ import java.util.Objects;
 
 import static org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS;
 
-public class LastQueryMergeNode extends MultiChildNode {
+public class LastQueryMergeNode extends MultiChildProcessNode {
 
   // The result output order, which could sort by sensor and time.
   // The size of this list is 2 and the first SortItem in this list has higher priority.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java
index 23b40ca829..b8e1cf422f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/last/LastQueryNode.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
@@ -38,7 +38,7 @@ import java.util.Objects;
 
 import static org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS;
 
-public class LastQueryNode extends MultiChildNode {
+public class LastQueryNode extends MultiChildProcessNode {
 
   private final Filter timeFilter;