You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/03/14 05:53:42 UTC

[iotdb] 10/11: complete basic definition of all nodes

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/mpp-query-basis
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fe5d9d228ebff8d3dc5ef0739fcfbf8e282ffd99
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu Mar 10 22:18:43 2022 +0800

    complete basic definition of all nodes
---
 .../query/distribution/common/LevelBucketInfo.java | 15 ------
 .../common/{TraversalOrder.java => OrderBy.java}   |  2 +-
 .../distribution/common/SeriesBatchAggInfo.java    | 21 --------
 .../query/distribution/common/SeriesBatchData.java | 14 -----
 .../cluster/query/distribution/plan/PlanNode.java  |  7 ++-
 .../query/distribution/plan/PlanNodeId.java        | 34 ++++++++++++
 .../distribution/plan/process/DeviceMergeNode.java | 40 ++++++++++----
 .../query/distribution/plan/process/FillNode.java  | 17 ++++--
 .../distribution/plan/process/FilterNode.java      | 19 ++++---
 .../plan/process/GroupByLevelNode.java             | 34 ++++++------
 .../query/distribution/plan/process/LimitNode.java | 17 ++++--
 .../distribution/plan/process/OffsetNode.java      | 17 ++++--
 .../distribution/plan/process/ProcessNode.java     |  7 ++-
 .../plan/process/SeriesAggregateNode.java          | 36 -------------
 .../query/distribution/plan/process/SortNode.java  | 23 +++++---
 .../distribution/plan/process/TimeJoinNode.java    | 40 +++++++++++---
 .../distribution/plan/process/WithoutNode.java     | 18 ++++---
 .../query/distribution/plan/sink/CsvSinkNode.java  | 13 ++++-
 .../distribution/plan/sink/FragmentSinkNode.java   | 20 +++++++
 .../query/distribution/plan/sink/SinkNode.java     | 10 +++-
 .../distribution/plan/sink/ThriftSinkNode.java     | 16 +++++-
 .../distribution/plan/source/CsvSourceNode.java    | 11 +++-
 .../plan/source/SeriesAggregateNode.java           | 63 ++++++++++++++++++++++
 .../distribution/plan/source/SeriesScanNode.java   | 45 ++++++++++------
 .../query/distribution/plan/source/SourceNode.java |  8 ++-
 25 files changed, 363 insertions(+), 184 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/LevelBucketInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/LevelBucketInfo.java
deleted file mode 100644
index 44ee37d..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/LevelBucketInfo.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.common;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * This class is used to store all the buckets for the GroupByLevelOperator
- * It stores the levels index and all the enumerated values in each level by a HashMap
- * Using the HashMap, the operator could calculate all the buckets using combination of values from each level
- */
-public class LevelBucketInfo {
-    // eg: If the clause is `group by level = 1, 2, 3`, the map should be like
-    // map{1 -> ['a', 'b'], 2 -> ['aa', 'bb'], 3 -> ['aaa', 'bbb']}
-    private Map<Integer, List<String>> levelMap;
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TraversalOrder.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/OrderBy.java
similarity index 87%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TraversalOrder.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/OrderBy.java
index ea88253..07f005f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TraversalOrder.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/OrderBy.java
@@ -3,7 +3,7 @@ package org.apache.iotdb.cluster.query.distribution.common;
 /**
  * The traversal order for operators by timestamp
  */
-public enum TraversalOrder {
+public enum OrderBy {
     TIMESTAMP_ASC,
     TIMESTAMP_DESC,
     DEVICE_NAME_ASC,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/SeriesBatchAggInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/SeriesBatchAggInfo.java
deleted file mode 100644
index c1726d5..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/SeriesBatchAggInfo.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.common;
-
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
-
-/**
- * SeriesBatchAggInfo is the "batch" result of SeriesAggregateOperator when its getNextBatch() is invoked.
- */
-public class SeriesBatchAggInfo {
-    // Path of the series.
-    // Path will be used in the downstream operators.
-    // GroupByLevelOperator will use it to divide the data into different buckets to do the rollup operation.
-    private Path path;
-
-    // Time range of current statistic.
-    private TimeRange timeRange;
-
-    // Statistics for the series in current time range
-    private Statistics<?> statistics;
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/SeriesBatchData.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/SeriesBatchData.java
deleted file mode 100644
index 3384d49..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/SeriesBatchData.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.common;
-
-import org.apache.iotdb.tsfile.read.common.BatchData;
-
-/**
- * @author xingtanzjr
- * TODO: currently we only use it to describe the result set of SeriesScanOperator
- * The BatchData is suitable as the encapsulation of part of result set of SeriesScanOperator
- * BatchData is the class defined and generally used in single-node IoTDB
- * We leverage it as the `batch` here. We can consider a more general name or make some modifications for it.
- */
-public class SeriesBatchData extends BatchData {
-
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNode.java
index 527b697..d4b763d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNode.java
@@ -7,6 +7,9 @@ import org.apache.iotdb.cluster.query.distribution.common.TreeNode;
  * The base class of query executable operators, which is used to compose logical query plan.
  * TODO: consider how to restrict the children type for each type of ExecOperator
  */
-public abstract class PlanNode<T> extends TreeNode<PlanNode<?>> {
-
+public abstract class PlanNode<T> extends TreeNode<PlanNode<T>> {
+    private PlanNodeId id;
+    public PlanNode(PlanNodeId id) {
+        this.id = id;
+    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNodeId.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNodeId.java
new file mode 100644
index 0000000..b9d2888
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNodeId.java
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.iotdb.cluster.query.distribution.plan;
+
+public class PlanNodeId {
+    private String id;
+    public PlanNodeId(String id) {
+        this.id = id;
+    }
+
+    public String getId() {
+        return this.id;
+    }
+
+    @Override
+    public String toString() {
+        return this.id;
+    }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/DeviceMergeNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/DeviceMergeNode.java
index fbadebe..9cd2549 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/DeviceMergeNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/DeviceMergeNode.java
@@ -1,8 +1,10 @@
 package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.cluster.query.distribution.common.OrderBy;
+import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
 import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 
 import java.util.List;
 import java.util.Map;
@@ -11,18 +13,36 @@ import java.util.Map;
  * DeviceMergeOperator is responsible for constructing a device-based view of a set of series. And output the result with
  * specific order. The order could be 'order by device' or 'order by timestamp'
  *
- * The types of involved devices should be same. If the device contains n series, the device-based view will contain n+2
- * columns, which are timestamp column, device name column and n value columns of involved series.
+ * Each output from its children should have the same schema. That means, the columns should be same between these TsBlocks.
+ * If the input TsBlock contains n columns, the device-based view will contain n+1 columns where the new column is Device
+ * column.
  *
- * Children type: [TimeJoinOperator]
  */
-public class DeviceMergeNode extends ProcessNode<TsBlock> {
+public class DeviceMergeNode extends ProcessNode {
     // The result output order that this operator
-    private TraversalOrder mergeOrder;
+    private OrderBy mergeOrder;
 
-    // Owned devices
-    private List<String> ownedDeviceNameList;
+    // The policy to decide whether a row should be discarded
+    // The without policy is able to be push down to the DeviceMergeNode because we can know whether a row contains
+    // null or not.
+    private WithoutPolicy withoutPolicy;
 
-    // The map from deviceName to corresponding query result operator responsible for that device.
-    private Map<String, TimeJoinNode> upstreamMap;
+    // The map from deviceName to corresponding query result node responsible for that device.
+    // DeviceNode means the node whose output TsBlock contains the data belonged to one device.
+    private Map<String, PlanNode<TsBlock>> childDeviceNodeMap;
+
+    public DeviceMergeNode(PlanNodeId id) {
+        super(id);
+    }
+
+    public DeviceMergeNode(PlanNodeId id, Map<String, PlanNode<TsBlock>> deviceNodeMap) {
+        this(id);
+        this.childDeviceNodeMap = deviceNodeMap;
+        this.children.addAll(deviceNodeMap.values());
+    }
+
+    public void addChildDeviceNode(String deviceName, PlanNode<TsBlock> childNode) {
+        this.childDeviceNodeMap.put(deviceName, childNode);
+        this.children.add(childNode);
+    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FillNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FillNode.java
index 8b08640..430948d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FillNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FillNode.java
@@ -3,14 +3,23 @@ package org.apache.iotdb.cluster.query.distribution.plan.process;
 import org.apache.iotdb.cluster.query.distribution.common.FillPolicy;
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 
 /**
- * FillOperator is used to fill the empty field in one row.
+ * FillNode is used to fill the empty field in one row.
  *
- * Children type: [All the operators whose result set is Tablet]
  */
-public class FillNode extends ProcessNode<TsBlock> {
+public class FillNode extends ProcessNode {
 
-    // The policy to discard the result from upstream operator
+    // The policy to discard the result from upstream node
     private FillPolicy fillPolicy;
+
+    public FillNode(PlanNodeId id) {
+        super(id);
+    }
+
+    public FillNode(PlanNodeId id, FillPolicy fillPolicy) {
+        this(id);
+        this.fillPolicy = fillPolicy;
+    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FilterNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FilterNode.java
index a579d4e..e7a4fce 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FilterNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FilterNode.java
@@ -1,17 +1,22 @@
 package org.apache.iotdb.cluster.query.distribution.plan.process;
 
-import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
-import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
 
 /**
- * (We use FilterExecOperator to distinguish itself from the FilterOperator used in single-node IoTDB)
- * The FilterExecOperator is responsible to filter the RowRecord from Tablet.
- *
- * Children type: [All the operators whose result set is Tablet]
+ * The FilterNode is responsible to filter the RowRecord from TsBlock.
  */
-public class FilterNode extends ProcessNode<TsBlock> {
+public class FilterNode extends ProcessNode {
 
     // The filter
     private FilterOperator rowFilter;
+
+    public FilterNode(PlanNodeId id) {
+        super(id);
+    }
+
+    public FilterNode(PlanNodeId id, FilterOperator rowFilter) {
+        this(id);
+        this.rowFilter = rowFilter;
+    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/GroupByLevelNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/GroupByLevelNode.java
index 2007900..54ff8eb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/GroupByLevelNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/GroupByLevelNode.java
@@ -1,25 +1,25 @@
 package org.apache.iotdb.cluster.query.distribution.plan.process;
 
-import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
-import org.apache.iotdb.cluster.query.distribution.common.LevelBucketInfo;
-import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
-import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 
 /**
- * This operator is responsible for the final aggregation merge operation.
- * It will arrange the data by time range firstly. And inside each time range, the data from same measurement and
- * different devices will be rolled up by corresponding level into different buckets.
- * If the bucketInfo is empty, the data from `same measurement and different devices` won't be rolled up.
- * If the groupByTimeParameter is null, the data won't be split by time range.
- *
- * Children type: [SeriesAggregateOperator]
+ * This node is responsible for the final aggregation merge operation.
+ * It will process the data from TsBlock row by row.
+ * For one row, it will rollup the fields which have the same aggregate function and belong to one bucket.
+ * Here, that two columns belong to one bucket means the partial paths of device after rolling up in specific level
+ * are the same.
+ * For example, let's say there are two columns `root.sg.d1.s1` and `root.sg.d2.s1`.
+ * If the group by level parameter is [0, 1], then these two columns will belong to one bucket and the bucket name
+ * is `root.sg.*.s1`.
+ * If the group by level parameter is [0, 2], then these two columns will not belong to one bucket. And the total buckets
+ * are `root.*.d1.s1` and `root.*.d2.s1`
  */
-public class GroupByLevelNode extends ProcessNode<TsBlock> {
+public class GroupByLevelNode extends ProcessNode {
 
-    // All the buckets that the SeriesBatchAggInfo from upstream will be divided into.
-    private LevelBucketInfo bucketInfo;
+    private int[] groupByLevels;
 
-    // The parameter of `group by time`
-    // The GroupByLevelOperator also need GroupByTimeParameter
-    private GroupByTimeParameter groupByTimeParameter;
+    public GroupByLevelNode(PlanNodeId id, int[] groupByLevels) {
+        super(id);
+        this.groupByLevels = groupByLevels;
+    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/LimitNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/LimitNode.java
index e675086..63d4370 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/LimitNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/LimitNode.java
@@ -1,15 +1,22 @@
 package org.apache.iotdb.cluster.query.distribution.plan.process;
 
-import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
-import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 
 /**
- * LimitOperator is used to select top n result. It uses the default order of upstream operators
+ * LimitNode is used to select top n result. It uses the default order of upstream nodes
  *
- * Children type: [All the operators whose result set is Tablet]
  */
-public class LimitNode extends ProcessNode<TsBlock> {
+public class LimitNode extends ProcessNode {
 
     // The limit count
     private int limit;
+
+    public LimitNode(PlanNodeId id) {
+        super(id);
+    }
+
+    public LimitNode(PlanNodeId id, int limit) {
+        this(id);
+        this.limit = limit;
+    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/OffsetNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/OffsetNode.java
index 78ac9eb..3ee19ab 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/OffsetNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/OffsetNode.java
@@ -1,15 +1,22 @@
 package org.apache.iotdb.cluster.query.distribution.plan.process;
 
-import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
-import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 
 /**
- * OffsetOperator is used to skip top n result from upstream operators. It uses the default order of upstream operators
+ * OffsetNode is used to skip top n result from upstream nodes. It uses the default order of upstream nodes
  *
- * Children type: [All the operators whose result set is Tablet]
  */
-public class OffsetNode extends ProcessNode<TsBlock> {
+public class OffsetNode extends ProcessNode {
 
     // The limit count
     private int offset;
+
+    public OffsetNode(PlanNodeId id) {
+        super(id);
+    }
+
+    public OffsetNode(PlanNodeId id, int offset) {
+        this(id);
+        this.offset = offset;
+    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/ProcessNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/ProcessNode.java
index e7bffae..d5165b0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/ProcessNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/ProcessNode.java
@@ -1,6 +1,11 @@
 package org.apache.iotdb.cluster.query.distribution.plan.process;
 
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 
-public class ProcessNode<T> extends PlanNode<T> {
+public class ProcessNode extends PlanNode<TsBlock> {
+    public ProcessNode(PlanNodeId id) {
+        super(id);
+    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SeriesAggregateNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SeriesAggregateNode.java
deleted file mode 100644
index 46ef991..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SeriesAggregateNode.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.plan.process;
-
-import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
-import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchAggInfo;
-import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
-import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
-
-/**
- * SeriesAggregateOperator is responsible to do the aggregation calculation for one series.
- * This operator will split data in one series into many groups by time range and do the aggregation calculation for each
- * group.
- * If there is no split parameter, it will return one result which is the aggregation result of all data in current series.
- *
- * Children type: [SeriesScanOperator]
- */
-public class SeriesAggregateNode extends ProcessNode<TsBlock> {
-
-    // The parameter of `group by time`
-    // Its value will be null if there is no `group by time` clause,
-    private GroupByTimeParameter groupByTimeParameter;
-
-    // TODO: need consider how to represent the aggregation function and corresponding implementation
-    // We use a String to indicate the parameter temporarily
-    private String aggregationFunc;
-
-    // This method will only be invoked by SeriesAggregateOperator
-    // It will return the statistics of the series in given time range
-    // When calculate the statistics, the operator should use the most optimized way to do that. In
-    // other words, using
-    // raw data is the final way to do that.
-    public Statistics<?> getNextStatisticBetween(TimeRange timeRange) {
-        return null;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SortNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SortNode.java
index a718247..705a4c1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SortNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SortNode.java
@@ -1,15 +1,22 @@
 package org.apache.iotdb.cluster.query.distribution.plan.process;
 
-import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
-import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+import org.apache.iotdb.cluster.query.distribution.common.OrderBy;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 
 /**
- * In general, the parameter in sortOperator should be pushed down to the upstream operators.
- * In our optimized logical query plan, the sortOperator should not appear.
+ * In general, the parameter in sortNode should be pushed down to the upstream operators.
+ * In our optimized logical query plan, the sortNode should not appear.
  */
-public class SortNode extends ProcessNode<TsBlock> {
+public class SortNode extends ProcessNode {
 
-    private TraversalOrder sortOrder;
-    
+    private OrderBy sortOrder;
+
+    public SortNode(PlanNodeId id) {
+        super(id);
+    }
+
+    public SortNode(PlanNodeId id, OrderBy sortOrder) {
+        this(id);
+        this.sortOrder = sortOrder;
+    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/TimeJoinNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/TimeJoinNode.java
index 08a7617..ffbeb29 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/TimeJoinNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/TimeJoinNode.java
@@ -1,24 +1,48 @@
 package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.cluster.query.distribution.common.OrderBy;
 import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
 import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
+
+import java.util.Arrays;
 
 /**
- * TimeJoinOperator is responsible for join two or more series.
- * The join algorithm is like outer join by timestamp column.
+ * TimeJoinOperator is responsible for join two or more TsBlock.
+ * The join algorithm is like outer join by timestamp column. It will join two or more TsBlock by Timestamp column.
  * The output result of TimeJoinOperator is sorted by timestamp
- *
- * Children type: [SeriesScanOperator]
  */
-public class TimeJoinNode extends ProcessNode<TsBlock> {
+//TODO: define the TimeJoinMergeNode for distributed plan
+public class TimeJoinNode extends ProcessNode {
 
     // This parameter indicates the order when executing multiway merge sort.
-    private TraversalOrder mergeOrder;
+    private OrderBy mergeOrder;
 
     // The policy to decide whether a row should be discarded
     // The without policy is able to be push down to the TimeJoinOperator because we can know whether a row contains
-    // null or not in this operator the situation won't be changed by the downstream operators.
+    // null or not.
     private WithoutPolicy withoutPolicy;
+
+    public TimeJoinNode(PlanNodeId id) {
+        super(id);
+        this.mergeOrder = OrderBy.TIMESTAMP_ASC;
+    }
+
+    public TimeJoinNode(PlanNodeId id, PlanNode<TsBlock>... children) {
+        super(id);
+        this.children.addAll(Arrays.asList(children));
+    }
+
+    public void addChild(PlanNode<TsBlock> child) {
+        this.children.add(child);
+    }
+
+    public void setMergeOrder(OrderBy mergeOrder) {
+        this.mergeOrder = mergeOrder;
+    }
+
+    public void setWithoutPolicy(WithoutPolicy withoutPolicy) {
+        this.withoutPolicy = withoutPolicy;
+    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/WithoutNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/WithoutNode.java
index b3f1d9e..6afc25b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/WithoutNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/WithoutNode.java
@@ -1,16 +1,22 @@
 package org.apache.iotdb.cluster.query.distribution.plan.process;
 
-import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
-import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 
 /**
- * WithoutOperator is used to discard specific result from upstream operators.
- *
- * Children type: [All the operators whose result set is Tablet]
+ * WithoutNode is used to discard specific rows from upstream node.
  */
-public class WithoutNode extends ProcessNode<TsBlock> {
+public class WithoutNode extends ProcessNode {
 
     // The policy to discard the result from upstream operator
     private WithoutPolicy discardPolicy;
+
+    public WithoutNode(PlanNodeId id) {
+        super(id);
+    }
+
+    public WithoutNode(PlanNodeId id, WithoutPolicy discardPolicy) {
+        this(id);
+        this.discardPolicy = discardPolicy;
+    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/CsvSinkNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/CsvSinkNode.java
index 3e9dc4c..6bafca7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/CsvSinkNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/CsvSinkNode.java
@@ -19,11 +19,20 @@
 
 package org.apache.iotdb.cluster.query.distribution.plan.sink;
 
-import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
+
+public class CsvSinkNode extends SinkNode {
+  public CsvSinkNode(PlanNodeId id) {
+    super(id);
+  }
 
-public class CsvSinkNode extends SinkNode<SeriesBatchData> {
   @Override
   public void close() throws Exception {
 
   }
+
+  @Override
+  public void send() {
+
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/FragmentSinkNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/FragmentSinkNode.java
new file mode 100644
index 0000000..348c00d
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/FragmentSinkNode.java
@@ -0,0 +1,20 @@
+package org.apache.iotdb.cluster.query.distribution.plan.sink;
+
+
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
+
+public class FragmentSinkNode extends SinkNode {
+    public FragmentSinkNode(PlanNodeId id) {
+        super(id);
+    }
+
+    @Override
+    public void send() {
+
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/SinkNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/SinkNode.java
index a94a0ff..de8ab9a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/SinkNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/SinkNode.java
@@ -19,9 +19,15 @@
 
 package org.apache.iotdb.cluster.query.distribution.plan.sink;
 
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 
-// 构建与客户端的联系。
-public abstract class SinkNode<T> extends PlanNode<T> implements AutoCloseable {
+public abstract class SinkNode extends PlanNode<TsBlock> implements AutoCloseable {
 
+    public SinkNode(PlanNodeId id) {
+        super(id);
+    }
+
+    public abstract void send();
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/ThriftSinkNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/ThriftSinkNode.java
index beed5a0..17db1f0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/ThriftSinkNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/ThriftSinkNode.java
@@ -19,10 +19,22 @@
 
 package org.apache.iotdb.cluster.query.distribution.plan.sink;
 
-import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 
-public class ThriftSinkNode extends SinkNode<SeriesBatchData> {
+/**
+ * not implemented in current IoTDB yet
+ */
+public class ThriftSinkNode extends SinkNode {
+
+  public ThriftSinkNode(PlanNodeId id) {
+    super(id);
+  }
 
   @Override
   public void close() throws Exception {}
+
+  @Override
+  public void send() {
+
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/CsvSourceNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/CsvSourceNode.java
index 85f402a..5ff967e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/CsvSourceNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/CsvSourceNode.java
@@ -19,9 +19,16 @@
 
 package org.apache.iotdb.cluster.query.distribution.plan.source;
 
-import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 
-public class CsvSourceNode extends SourceNode<SeriesBatchData> {
+/**
+ * Not implemented in current version.
+ */
+public class CsvSourceNode extends SourceNode {
+
+  public CsvSourceNode(PlanNodeId id) {
+    super(id);
+  }
 
   @Override
   public void close() throws Exception {}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesAggregateNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesAggregateNode.java
new file mode 100644
index 0000000..0a7de03
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesAggregateNode.java
@@ -0,0 +1,63 @@
+package org.apache.iotdb.cluster.query.distribution.plan.source;
+
+import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
+import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+/**
+ * SeriesAggregateOperator is responsible to do the aggregation calculation for one series. It will read the
+ * target series and calculate the aggregation result by the aggregation digest or raw data of this series.
+ *
+ * The aggregation result will be represented as a TsBlock
+ *
+ * This operator will split data of the target series into many groups by time range and do the aggregation calculation
+ * for each group. Each result will be one row of the result TsBlock. The timestamp of each row is the start time of the
+ * time range group.
+ *
+ * If there is no time range split parameter, the result TsBlock will only contain one row, which represent the whole
+ * aggregation result of this series. And the timestamp will be 0, which is meaningless.
+ */
+public class SeriesAggregateNode extends SourceNode {
+
+    // The parameter of `group by time`
+    // Its value will be null if there is no `group by time` clause,
+    private GroupByTimeParameter groupByTimeParameter;
+
+    // The aggregation function, which contains the function name and related series.
+    // (Currently we only support one series in the aggregation function)
+    // TODO: need consider whether it is suitable the aggregation function using FunctionExpression
+    private FunctionExpression aggregateFunc;
+
+    private Filter filter;
+
+    public SeriesAggregateNode(PlanNodeId id) {
+        super(id);
+    }
+
+    public SeriesAggregateNode(PlanNodeId id, FunctionExpression aggregateFunc) {
+        this(id);
+        this.aggregateFunc = aggregateFunc;
+    }
+
+    public SeriesAggregateNode(PlanNodeId id, FunctionExpression aggregateFunc, GroupByTimeParameter groupByTimeParameter) {
+        this(id, aggregateFunc);
+        this.groupByTimeParameter = groupByTimeParameter;
+    }
+
+    @Override
+    public void open() throws Exception {
+
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+
+    // This method is used when do the PredicatePushDown.
+    // The filter is not put in the constructor because the filter is only clear in the predicate push-down stage
+    public void setFilter(Filter filter) {
+        this.filter = filter;
+    }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesScanNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesScanNode.java
index 7ff8978..738a6ea 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesScanNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesScanNode.java
@@ -1,24 +1,18 @@
 package org.apache.iotdb.cluster.query.distribution.plan.source;
 
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
-import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.cluster.query.distribution.common.OrderBy;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 /**
- * SeriesScanOperator is responsible for read data and pre-aggregated statistic for a specific
- * series. When reading data, the SeriesScanOperator can read the raw data batch by batch. And also,
- * it can leverage the filter and other info to decrease the result set. Besides, the
- * SeriesScanOperator can read the pre-aggregated statistic in TsFile. And return the statistic with
- * a fix time range one by one. If the time range is narrower than the smallest pre-aggregated
- * statistic or has overlap with pre-aggregated statistic, the SeriesScanOperator will read the raw
- * data and calculate the aggregation result for specific time range.
+ * SeriesScanOperator is responsible for read data a specific series. When reading data, the SeriesScanOperator
+ * can read the raw data batch by batch. And also, it can leverage the filter and other info to decrease the
+ * result set.
  *
- * <p>Children type: []
+ * <p>Children type: no child is allowed for SeriesScanNode
  */
-public class SeriesScanNode extends SourceNode<TsBlock> {
+public class SeriesScanNode extends SourceNode {
 
   // The path of the target series which will be scanned.
   private Path seriesPath;
@@ -26,20 +20,41 @@ public class SeriesScanNode extends SourceNode<TsBlock> {
   // The order to traverse the data.
   // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
   // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
-  private TraversalOrder scanOrder = TraversalOrder.TIMESTAMP_ASC;
+  private OrderBy scanOrder = OrderBy.TIMESTAMP_ASC;
 
   // Filter data in current series.
   private Filter filter;
 
   // Limit for result set. The default value is -1, which means no limit
-  private int limit = -1;
+  private int limit;
 
   // offset for result set. The default value is 0
   private int offset;
 
+  public SeriesScanNode(PlanNodeId id, Path seriesPath) {
+    super(id);
+    this.seriesPath = seriesPath;
+  }
+
+  public void setFilter(Filter filter) {
+    this.filter = filter;
+  }
+
   @Override
   public void close() throws Exception {}
 
   @Override
   public void open() throws Exception {}
+
+  public void setScanOrder(OrderBy scanOrder) {
+    this.scanOrder = scanOrder;
+  }
+
+  public void setLimit(int limit) {
+    this.limit = limit;
+  }
+
+  public void setOffset(int offset) {
+    this.offset = offset;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SourceNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SourceNode.java
index ed0e39a..b72d0f7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SourceNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SourceNode.java
@@ -19,9 +19,15 @@
 
 package org.apache.iotdb.cluster.query.distribution.plan.source;
 
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 
-public abstract class SourceNode<T> extends PlanNode<T> implements AutoCloseable{
+public abstract class SourceNode extends PlanNode<TsBlock> implements AutoCloseable{
+
+  public SourceNode(PlanNodeId id) {
+    super(id);
+  }
 
   public abstract void open() throws Exception;
 }