You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/03/10 11:46:21 UTC

[iotdb] branch xingtanzjr/operator-design updated (0b279d1 -> d8f2e1f)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch xingtanzjr/operator-design
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


    from 0b279d1  move query distribution folder to cluster
     new 1d7b20e  rename operator to node
     add 60cd7f7  design of source, sink and internal operators
     add bdbd38e  design of source, sink and internal operators
     new ed432ec  Merge branch 'suyurong/operator-design' into xingtanzjr/operator-design
     new d8f2e1f  align class name

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../cluster/query/distribution/common/Tablet.java  | 29 -----------
 .../cluster/query/distribution/common/TsBlock.java | 30 ++++++++++++
 .../{TabletMetadata.java => TsBlockMetadata.java}  |  2 +-
 .../query/distribution/operator/ExecOperator.java  | 18 -------
 .../query/distribution/operator/FillOperator.java  | 25 ----------
 .../query/distribution/operator/LimitOperator.java | 24 ---------
 .../distribution/operator/OffsetOperator.java      | 24 ---------
 .../distribution/operator/SeriesScanOperator.java  | 57 ----------------------
 .../query/distribution/operator/SortOperator.java  | 23 ---------
 .../cluster/query/distribution/plan/PlanNode.java  | 12 +++++
 .../process/DeviceMergeNode.java}                  | 22 ++-------
 .../query/distribution/plan/process/FillNode.java  | 16 ++++++
 .../process/FilterNode.java}                       | 17 ++-----
 .../process/GroupByLevelNode.java}                 | 17 ++-----
 .../query/distribution/plan/process/LimitNode.java | 15 ++++++
 .../distribution/plan/process/OffsetNode.java      | 15 ++++++
 .../distribution/plan/process/ProcessNode.java     |  6 +++
 .../process/SeriesAggregateNode.java}              | 21 ++++----
 .../query/distribution/plan/process/SortNode.java  | 15 ++++++
 .../process/TimeJoinNode.java}                     | 17 ++-----
 .../process/WithoutNode.java}                      | 17 ++-----
 .../query/distribution/plan/sink/CsvSinkNode.java  |  9 ++--
 .../distribution/plan/sink/SinkNode.java}          | 13 +++--
 .../distribution/plan/sink/ThriftSinkNode.java     | 10 ++--
 .../distribution/plan/source/CsvSourceNode.java    | 19 +++-----
 .../distribution/plan/source/SeriesScanNode.java   | 45 +++++++++++++++++
 .../query/distribution/plan/source/SourceNode.java | 11 ++---
 27 files changed, 217 insertions(+), 312 deletions(-)
 delete mode 100644 cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/Tablet.java
 create mode 100644 cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TsBlock.java
 rename cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/{TabletMetadata.java => TsBlockMetadata.java} (96%)
 delete mode 100644 cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecOperator.java
 delete mode 100644 cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillOperator.java
 delete mode 100644 cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitOperator.java
 delete mode 100644 cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetOperator.java
 delete mode 100644 cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanOperator.java
 delete mode 100644 cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortOperator.java
 create mode 100644 cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNode.java
 rename cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/{operator/DeviceMergeOperator.java => plan/process/DeviceMergeNode.java} (57%)
 create mode 100644 cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FillNode.java
 rename cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/{operator/FilterExecOperator.java => plan/process/FilterNode.java} (52%)
 rename cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/{operator/GroupByLevelOperator.java => plan/process/GroupByLevelNode.java} (73%)
 create mode 100644 cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/LimitNode.java
 create mode 100644 cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/OffsetNode.java
 create mode 100644 cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/ProcessNode.java
 rename cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/{operator/SeriesAggregateOperator.java => plan/process/SeriesAggregateNode.java} (55%)
 create mode 100644 cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SortNode.java
 rename cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/{operator/TimeJoinOperator.java => plan/process/TimeJoinNode.java} (71%)
 rename cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/{operator/WithoutOperator.java => plan/process/WithoutNode.java} (50%)
 copy server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFAsin.java => cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/CsvSinkNode.java (77%)
 copy cluster/src/main/java/org/apache/iotdb/cluster/{server/service/package-info.java => query/distribution/plan/sink/SinkNode.java} (78%)
 copy server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFAcos.java => cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/ThriftSinkNode.java (77%)
 copy tsfile/src/main/java/org/apache/iotdb/tsfile/read/filter/factory/FilterType.java => cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/CsvSourceNode.java (73%)
 create mode 100644 cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesScanNode.java
 copy server/src/main/java/org/apache/iotdb/db/service/metrics/MetricsServiceMBean.java => cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SourceNode.java (75%)

[iotdb] 01/03: rename operator to node

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/operator-design
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1d7b20e5e89838f156ffa970f070ed19f264518f
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu Mar 10 19:19:27 2022 +0800

    rename operator to node
---
 .../cluster/query/distribution/common/Tablet.java  | 29 ---------------------
 .../cluster/query/distribution/common/TsBlock.java | 30 ++++++++++++++++++++++
 .../{TabletMetadata.java => TsBlockMetadata.java}  |  2 +-
 ...viceMergeOperator.java => DeviceMergeNode.java} |  8 +++---
 .../operator/{FillOperator.java => FillNode.java}  |  6 ++---
 ...FilterExecOperator.java => FilterExecNode.java} |  6 ++---
 ...pByLevelOperator.java => GroupByLevelNode.java} |  6 ++---
 .../{LimitOperator.java => LimitNode.java}         |  6 ++---
 .../{OffsetOperator.java => OffsetNode.java}       |  6 ++---
 .../operator/{ExecOperator.java => PlanNode.java}  |  2 +-
 ...egateOperator.java => SeriesAggregateNode.java} |  2 +-
 ...SeriesScanOperator.java => SeriesScanNode.java} |  2 +-
 .../operator/{SortOperator.java => SortNode.java}  |  6 ++---
 .../{TimeJoinOperator.java => TimeJoinNode.java}   |  6 ++---
 .../{WithoutOperator.java => WithoutNode.java}     |  6 ++---
 15 files changed, 62 insertions(+), 61 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/Tablet.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/Tablet.java
deleted file mode 100644
index dde6fa7..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/Tablet.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.common;
-
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-
-/**
- * Intermediate result for most of ExecOperators.
- * The Tablet contains data from one or more series and constructs them as a row based view
- * The Tablet constructed with n series has n+1 columns where one column is timestamp and the other n columns are values
- * from each series.
- * The Tablet also contains the metadata of owned series.
- *
- * TODO: consider the detailed data store model in memory. (using column based or row based ?)
- */
-public class Tablet {
-
-    private TabletMetadata metadata;
-
-    public boolean hasNext() {
-        return false;
-    }
-
-    public RowRecord getNext() {
-        return null;
-    }
-
-    public TabletMetadata getMetadata() {
-        return metadata;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TsBlock.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TsBlock.java
new file mode 100644
index 0000000..73fb008
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TsBlock.java
@@ -0,0 +1,30 @@
+package org.apache.iotdb.cluster.query.distribution.common;
+
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+/**
+ * Intermediate result for most of ExecOperators.
+ * The Tablet contains data from one or more columns and constructs them as a row based view
+ * The columns can be series, aggregation result for one series or scalar value (such as deviceName).
+ * The Tablet also contains the metadata to describe the columns.
+ *
+ * TODO: consider the detailed data store model in memory. (using column based or row based ?)
+ */
+public class TsBlock {
+
+    // Describe the column info
+    private TsBlockMetadata metadata;
+
+    public boolean hasNext() {
+        return false;
+    }
+
+    // Get next row in current tablet
+    public RowRecord getNext() {
+        return null;
+    }
+
+    public TsBlockMetadata getMetadata() {
+        return metadata;
+    }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TabletMetadata.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TsBlockMetadata.java
similarity index 96%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TabletMetadata.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TsBlockMetadata.java
index cdf827e..cd95d82 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TabletMetadata.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TsBlockMetadata.java
@@ -2,7 +2,7 @@ package org.apache.iotdb.cluster.query.distribution.common;
 
 import java.util.List;
 
-public class TabletMetadata {
+public class TsBlockMetadata {
     // list of all columns in current Tablet
     // The column list not only contains the series column, but also contains other column to construct the final result
     // set such as timestamp and deviceName
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeNode.java
similarity index 85%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeNode.java
index 1800179..ff831d9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeNode.java
@@ -1,6 +1,6 @@
 package org.apache.iotdb.cluster.query.distribution.operator;
 
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
 
 import java.util.List;
@@ -15,7 +15,7 @@ import java.util.Map;
  *
  * Children type: [TimeJoinOperator]
  */
-public class DeviceMergeOperator extends ExecOperator<Tablet> {
+public class DeviceMergeNode extends PlanNode<TsBlock> {
     // The result output order that this operator
     private TraversalOrder mergeOrder;
 
@@ -23,7 +23,7 @@ public class DeviceMergeOperator extends ExecOperator<Tablet> {
     private List<String> ownedDeviceNameList;
 
     // The map from deviceName to corresponding query result operator responsible for that device.
-    private Map<String, TimeJoinOperator> upstreamMap;
+    private Map<String, TimeJoinNode> upstreamMap;
 
     @Override
     public boolean hasNext() {
@@ -34,7 +34,7 @@ public class DeviceMergeOperator extends ExecOperator<Tablet> {
     // the additional column is `deviceName`
     // And, the `alignedByDevice` in the TabletMetadata will be `true`
     @Override
-    public Tablet getNextBatch() {
+    public TsBlock getNextBatch() {
         return null;
     }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillNode.java
similarity index 75%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillNode.java
index 9390e59..a8f0076 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillNode.java
@@ -1,14 +1,14 @@
 package org.apache.iotdb.cluster.query.distribution.operator;
 
 import org.apache.iotdb.cluster.query.distribution.common.FillPolicy;
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 
 /**
  * FillOperator is used to fill the empty field in one row.
  *
  * Children type: [All the operators whose result set is Tablet]
  */
-public class FillOperator extends ExecOperator<Tablet> {
+public class FillNode extends PlanNode<TsBlock> {
 
     // The policy to discard the result from upstream operator
     private FillPolicy fillPolicy;
@@ -19,7 +19,7 @@ public class FillOperator extends ExecOperator<Tablet> {
     }
 
     @Override
-    public Tablet getNextBatch() {
+    public TsBlock getNextBatch() {
         return null;
     }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecNode.java
similarity index 77%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecNode.java
index efe4609..0bf0ad4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecNode.java
@@ -1,6 +1,6 @@
 package org.apache.iotdb.cluster.query.distribution.operator;
 
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
 
 /**
@@ -9,7 +9,7 @@ import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
  *
  * Children type: [All the operators whose result set is Tablet]
  */
-public class FilterExecOperator extends ExecOperator<Tablet> {
+public class FilterExecNode extends PlanNode<TsBlock> {
 
     // The filter
     private FilterOperator rowFilter;
@@ -20,7 +20,7 @@ public class FilterExecOperator extends ExecOperator<Tablet> {
     }
 
     @Override
-    public Tablet getNextBatch() {
+    public TsBlock getNextBatch() {
         return null;
     }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelNode.java
similarity index 87%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelNode.java
index 729c84f..38448d5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelNode.java
@@ -2,7 +2,7 @@ package org.apache.iotdb.cluster.query.distribution.operator;
 
 import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
 import org.apache.iotdb.cluster.query.distribution.common.LevelBucketInfo;
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 
 /**
  * This operator is responsible for the final aggregation merge operation.
@@ -13,7 +13,7 @@ import org.apache.iotdb.cluster.query.distribution.common.Tablet;
  *
  * Children type: [SeriesAggregateOperator]
  */
-public class GroupByLevelOperator extends ExecOperator<Tablet> {
+public class GroupByLevelNode extends PlanNode<TsBlock> {
 
     // All the buckets that the SeriesBatchAggInfo from upstream will be divided into.
     private LevelBucketInfo bucketInfo;
@@ -28,7 +28,7 @@ public class GroupByLevelOperator extends ExecOperator<Tablet> {
     }
 
     @Override
-    public Tablet getNextBatch() {
+    public TsBlock getNextBatch() {
         return null;
     }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitNode.java
similarity index 71%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitNode.java
index a48e664..875f8c4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitNode.java
@@ -1,13 +1,13 @@
 package org.apache.iotdb.cluster.query.distribution.operator;
 
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 
 /**
  * LimitOperator is used to select top n result. It uses the default order of upstream operators
  *
  * Children type: [All the operators whose result set is Tablet]
  */
-public class LimitOperator extends ExecOperator<Tablet> {
+public class LimitNode extends PlanNode<TsBlock> {
 
     // The limit count
     private int limit;
@@ -18,7 +18,7 @@ public class LimitOperator extends ExecOperator<Tablet> {
     }
 
     @Override
-    public Tablet getNextBatch() {
+    public TsBlock getNextBatch() {
         return null;
     }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetNode.java
similarity index 72%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetNode.java
index 11cd7de..6de44c5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetNode.java
@@ -1,13 +1,13 @@
 package org.apache.iotdb.cluster.query.distribution.operator;
 
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 
 /**
  * OffsetOperator is used to skip top n result from upstream operators. It uses the default order of upstream operators
  *
  * Children type: [All the operators whose result set is Tablet]
  */
-public class OffsetOperator extends ExecOperator<Tablet> {
+public class OffsetNode extends PlanNode<TsBlock> {
 
     // The limit count
     private int offset;
@@ -18,7 +18,7 @@ public class OffsetOperator extends ExecOperator<Tablet> {
     }
 
     @Override
-    public Tablet getNextBatch() {
+    public TsBlock getNextBatch() {
         return null;
     }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/PlanNode.java
similarity index 88%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/PlanNode.java
index 883589b..ae30645 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/PlanNode.java
@@ -7,7 +7,7 @@ import org.apache.iotdb.cluster.query.distribution.common.TreeNode;
  * The base class of query executable operators, which is used to compose logical query plan.
  * TODO: consider how to restrict the children type for each type of ExecOperator
  */
-public abstract class ExecOperator<T> extends TreeNode<ExecOperator<?>> {
+public abstract class PlanNode<T> extends TreeNode<PlanNode<?>> {
 
     // Judge whether current operator has more result
     public abstract boolean hasNext();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateNode.java
similarity index 93%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateNode.java
index e566f49..62fb223 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateNode.java
@@ -11,7 +11,7 @@ import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchAggInfo;
  *
  * Children type: [SeriesScanOperator]
  */
-public class SeriesAggregateOperator extends ExecOperator<SeriesBatchAggInfo> {
+public class SeriesAggregateNode extends PlanNode<SeriesBatchAggInfo> {
 
     // The parameter of `group by time`
     // Its value will be null if there is no `group by time` clause,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanNode.java
similarity index 96%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanNode.java
index 46077bd..1c8ac3d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanNode.java
@@ -18,7 +18,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
  *
  * Children type: []
  */
-public class SeriesScanOperator extends ExecOperator<SeriesBatchData> {
+public class SeriesScanNode extends PlanNode<SeriesBatchData> {
 
     // The path of the target series which will be scanned.
     private Path seriesPath;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortNode.java
similarity index 74%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortNode.java
index ab3f01d..8bcc520 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortNode.java
@@ -1,13 +1,13 @@
 package org.apache.iotdb.cluster.query.distribution.operator;
 
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
 
 /**
  * In general, the parameter in sortOperator should be pushed down to the upstream operators.
  * In our optimized logical query plan, the sortOperator should not appear.
  */
-public class SortOperator extends ExecOperator<Tablet> {
+public class SortNode extends PlanNode<TsBlock> {
 
     private TraversalOrder sortOrder;
 
@@ -17,7 +17,7 @@ public class SortOperator extends ExecOperator<Tablet> {
     }
 
     @Override
-    public Tablet getNextBatch() {
+    public TsBlock getNextBatch() {
         return null;
     }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinNode.java
similarity index 86%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinNode.java
index c573495..b293a13 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinNode.java
@@ -1,6 +1,6 @@
 package org.apache.iotdb.cluster.query.distribution.operator;
 
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
 import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
 
@@ -11,7 +11,7 @@ import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
  *
  * Children type: [SeriesScanOperator]
  */
-public class TimeJoinOperator extends ExecOperator<Tablet> {
+public class TimeJoinNode extends PlanNode<TsBlock> {
 
     // This parameter indicates the order when executing multiway merge sort.
     private TraversalOrder mergeOrder;
@@ -27,7 +27,7 @@ public class TimeJoinOperator extends ExecOperator<Tablet> {
     }
 
     @Override
-    public Tablet getNextBatch() {
+    public TsBlock getNextBatch() {
         return null;
     }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutNode.java
similarity index 76%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutNode.java
index f25d435..c58a367 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutNode.java
@@ -1,6 +1,6 @@
 package org.apache.iotdb.cluster.query.distribution.operator;
 
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
 
 /**
@@ -8,7 +8,7 @@ import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
  *
  * Children type: [All the operators whose result set is Tablet]
  */
-public class WithoutOperator extends ExecOperator<Tablet> {
+public class WithoutNode extends PlanNode<TsBlock> {
 
     // The policy to discard the result from upstream operator
     private WithoutPolicy discardPolicy;
@@ -19,7 +19,7 @@ public class WithoutOperator extends ExecOperator<Tablet> {
     }
 
     @Override
-    public Tablet getNextBatch() {
+    public TsBlock getNextBatch() {
         return null;
     }
 }

[iotdb] 03/03: align class name

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/operator-design
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d8f2e1f29e0467bebc4dd69496fe4a2f967ffbaf
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu Mar 10 19:40:12 2022 +0800

    align class name
---
 .../distribution/operator/ExecutableOperator.java  | 34 -------------
 .../distribution/operator/SeriesScanNode.java      | 57 ----------------------
 .../operator/internal/DeviceMergeOperator.java     | 43 ----------------
 .../operator/internal/FillOperator.java            | 25 ----------
 .../operator/internal/FilterInternalOperator.java  | 26 ----------
 .../operator/internal/GroupByLevelOperator.java    | 34 -------------
 .../operator/internal/InternalOperator.java        |  7 ---
 .../operator/internal/LimitOperator.java           | 24 ---------
 .../operator/internal/OffsetOperator.java          | 25 ----------
 .../operator/internal/SeriesAggregateOperator.java | 33 -------------
 .../operator/internal/SortOperator.java            | 23 ---------
 .../operator/internal/TimeJoinOperator.java        | 33 -------------
 .../operator/internal/WithoutOperator.java         | 25 ----------
 .../operator/source/CsvSourceOperator.java         | 41 ----------------
 .../distribution/{operator => plan}/PlanNode.java  |  8 +--
 .../process}/DeviceMergeNode.java                  | 18 ++-----
 .../{operator => plan/process}/FillNode.java       | 15 ++----
 .../process/FilterNode.java}                       | 15 ++----
 .../process}/GroupByLevelNode.java                 | 15 ++----
 .../{operator => plan/process}/LimitNode.java      | 15 ++----
 .../{operator => plan/process}/OffsetNode.java     | 15 ++----
 .../distribution/plan/process/ProcessNode.java     |  6 +++
 .../process}/SeriesAggregateNode.java              | 21 ++++----
 .../{operator => plan/process}/SortNode.java       | 16 ++----
 .../{operator => plan/process}/TimeJoinNode.java   | 15 ++----
 .../{operator => plan/process}/WithoutNode.java    | 15 ++----
 .../sink/CsvSinkNode.java}                         | 18 ++-----
 .../sink/SinkNode.java}                            |  9 ++--
 .../sink/ThriftSinkNode.java}                      | 17 +------
 .../source/CsvSourceNode.java}                     | 14 +-----
 .../source/SeriesScanNode.java}                    | 25 ++--------
 .../source/SourceNode.java}                        |  7 ++-
 32 files changed, 64 insertions(+), 630 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecutableOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecutableOperator.java
deleted file mode 100644
index 00a7a6a..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecutableOperator.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.TreeNode;
-
-public abstract class ExecutableOperator<T> extends TreeNode<ExecutableOperator<?>> {
-
-  // Resource control, runtime control...
-
-  // Judge whether current operator has more result
-  public abstract boolean hasNext();
-
-  // Get next result batch of this operator
-  // Return null if there is no more result to return
-  public abstract T getNextBatch();
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanNode.java
deleted file mode 100644
index 1c8ac3d..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanNode.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-
-/**
- * SeriesScanOperator is responsible for read data and pre-aggregated statistic for a specific series.
- * When reading data, the SeriesScanOperator can read the raw data batch by batch. And also, it can leverage
- * the filter and other info to decrease the result set.
- * Besides, the SeriesScanOperator can read the pre-aggregated statistic in TsFile. And return the statistic with
- * a fix time range one by one. If the time range is narrower than the smallest pre-aggregated statistic or has overlap
- * with pre-aggregated statistic, the SeriesScanOperator will read the raw data and calculate the aggregation result for
- * specific time range.
- *
- * Children type: []
- */
-public class SeriesScanNode extends PlanNode<SeriesBatchData> {
-
-    // The path of the target series which will be scanned.
-    private Path seriesPath;
-
-    // The order to traverse the data.
-    // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
-    // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
-    private TraversalOrder scanOrder = TraversalOrder.TIMESTAMP_ASC;
-
-    // Filter data in current series.
-    private Filter filter;
-
-    // Limit for result set. The default value is -1, which means no limit
-    private int limit = -1;
-
-    // offset for result set. The default value is 0
-    private int offset;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public SeriesBatchData getNextBatch() {
-        return null;
-    }
-
-    // This method will only be invoked by SeriesAggregateOperator
-    // It will return the statistics of the series in given time range
-    // When calculate the statistics, the operator should use the most optimized way to do that. In other words, using
-    // raw data is the final way to do that.
-    public Statistics<?> getNextStatisticBetween(TimeRange timeRange) {
-        return null;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java
deleted file mode 100644
index d4bbced..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * DeviceMergeOperator is responsible for constructing a device-based view of a set of series. And
- * output the result with specific order. The order could be 'order by device' or 'order by
- * timestamp'
- *
- * <p>The types of involved devices should be same. If the device contains n series, the
- * device-based view will contain n+2 columns, which are timestamp column, device name column and n
- * value columns of involved series.
- *
- * <p>Children type: [TimeJoinOperator]
- */
-public class DeviceMergeOperator extends InternalOperator<Tablet> {
-  // The result output order that this operator
-  private TraversalOrder mergeOrder;
-
-  // Owned devices
-  private List<String> ownedDeviceNameList;
-
-  // The map from deviceName to corresponding query result operator responsible for that device.
-  private Map<String, TimeJoinOperator> upstreamMap;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  // If the Tablet from TimeJoinOperator has n columns, the output of DeviceMergeOperator will
-  // contain n+1 columns where
-  // the additional column is `deviceName`
-  // And, the `alignedByDevice` in the TabletMetadata will be `true`
-  @Override
-  public Tablet getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java
deleted file mode 100644
index 595a27e..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.FillPolicy;
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-
-/**
- * FillOperator is used to fill the empty field in one row.
- *
- * <p>Children type: [All the operators whose result set is Tablet]
- */
-public class FillOperator extends InternalOperator<Tablet> {
-
-  // The policy to discard the result from upstream operator
-  private FillPolicy fillPolicy;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public Tablet getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java
deleted file mode 100644
index f41d768..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
-
-/**
- * (We use FilterExecOperator to distinguish itself from the FilterOperator used in single-node
- * IoTDB) The FilterExecOperator is responsible to filter the RowRecord from Tablet.
- *
- * <p>Children type: [All the operators whose result set is Tablet]
- */
-public class FilterInternalOperator extends InternalOperator<Tablet> {
-
-  // The filter
-  private FilterOperator rowFilter;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public Tablet getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java
deleted file mode 100644
index 9cab1c0..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
-import org.apache.iotdb.cluster.query.distribution.common.LevelBucketInfo;
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-
-/**
- * This operator is responsible for the final aggregation merge operation. It will arrange the data
- * by time range firstly. And inside each time range, the data from same measurement and different
- * devices will be rolled up by corresponding level into different buckets. If the bucketInfo is
- * empty, the data from `same measurement and different devices` won't be rolled up. If the
- * groupByTimeParameter is null, the data won't be split by time range.
- *
- * <p>Children type: [SeriesAggregateOperator]
- */
-public class GroupByLevelOperator extends InternalOperator<Tablet> {
-
-  // All the buckets that the SeriesBatchAggInfo from upstream will be divided into.
-  private LevelBucketInfo bucketInfo;
-
-  // The parameter of `group by time`
-  // The GroupByLevelOperator also need GroupByTimeParameter
-  private GroupByTimeParameter groupByTimeParameter;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public Tablet getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java
deleted file mode 100644
index a5b8b5b..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.operator.ExecutableOperator;
-
-// 从 buffer 拉数据
-// 推送到下游的逻辑
-public abstract class InternalOperator<T> extends ExecutableOperator<T> {}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java
deleted file mode 100644
index 1330eee..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-
-/**
- * LimitOperator is used to select top n result. It uses the default order of upstream operators
- *
- * <p>Children type: [All the operators whose result set is Tablet]
- */
-public class LimitOperator extends InternalOperator<Tablet> {
-
-  // The limit count
-  private int limit;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public Tablet getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java
deleted file mode 100644
index aa82d8c..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-
-/**
- * OffsetOperator is used to skip top n result from upstream operators. It uses the default order of
- * upstream operators
- *
- * <p>Children type: [All the operators whose result set is Tablet]
- */
-public class OffsetOperator extends InternalOperator<Tablet> {
-
-  // The limit count
-  private int offset;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public Tablet getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java
deleted file mode 100644
index 889c50b..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
-import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchAggInfo;
-
-/**
- * SeriesAggregateOperator is responsible to do the aggregation calculation for one series. This
- * operator will split data in one series into many groups by time range and do the aggregation
- * calculation for each group. If there is no split parameter, it will return one result which is
- * the aggregation result of all data in current series.
- *
- * <p>Children type: [SeriesScanOperator]
- */
-public class SeriesAggregateOperator extends InternalOperator<SeriesBatchAggInfo> {
-
-  // The parameter of `group by time`
-  // Its value will be null if there is no `group by time` clause,
-  private GroupByTimeParameter groupByTimeParameter;
-
-  // TODO: need consider how to represent the aggregation function and corresponding implementation
-  // We use a String to indicate the parameter temporarily
-  private String aggregationFunc;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public SeriesBatchAggInfo getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java
deleted file mode 100644
index d2aee44..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
-
-/**
- * In general, the parameter in sortOperator should be pushed down to the upstream operators. In our
- * optimized logical query plan, the sortOperator should not appear.
- */
-public class SortOperator extends InternalOperator<Tablet> {
-
-  private TraversalOrder sortOrder;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public Tablet getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java
deleted file mode 100644
index bd7cf37..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
-import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
-
-/**
- * TimeJoinOperator is responsible for join two or more series. The join algorithm is like outer
- * join by timestamp column. The output result of TimeJoinOperator is sorted by timestamp
- *
- * <p>Children type: [SeriesScanOperator]
- */
-public class TimeJoinOperator extends InternalOperator<Tablet> {
-
-  // This parameter indicates the order when executing multiway merge sort.
-  private TraversalOrder mergeOrder;
-
-  // The policy to decide whether a row should be discarded
-  // The without policy is able to be push down to the TimeJoinOperator because we can know whether
-  // a row contains
-  // null or not in this operator the situation won't be changed by the downstream operators.
-  private WithoutPolicy withoutPolicy;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public Tablet getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java
deleted file mode 100644
index f678a2c..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
-
-/**
- * WithoutOperator is used to discard specific result from upstream operators.
- *
- * <p>Children type: [All the operators whose result set is Tablet]
- */
-public class WithoutOperator extends InternalOperator<Tablet> {
-
-  // The policy to discard the result from upstream operator
-  private WithoutPolicy discardPolicy;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public Tablet getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/CsvSourceOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/CsvSourceOperator.java
deleted file mode 100644
index ff82128..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/CsvSourceOperator.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.query.distribution.operator.source;
-
-import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
-
-public class CsvSourceOperator extends SourceOperator<SeriesBatchData> {
-
-  @Override
-  public void close() throws Exception {}
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public SeriesBatchData getNextBatch() {
-    return null;
-  }
-
-  @Override
-  public void open() throws Exception {}
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/PlanNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNode.java
similarity index 53%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/PlanNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNode.java
index ae30645..527b697 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/PlanNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNode.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan;
 
 import org.apache.iotdb.cluster.query.distribution.common.TreeNode;
 
@@ -9,10 +9,4 @@ import org.apache.iotdb.cluster.query.distribution.common.TreeNode;
  */
 public abstract class PlanNode<T> extends TreeNode<PlanNode<?>> {
 
-    // Judge whether current operator has more result
-    public abstract boolean hasNext();
-
-    // Get next result batch of this operator
-    // Return null if there is no more result to return
-    public abstract T getNextBatch();
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/DeviceMergeNode.java
similarity index 65%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/DeviceMergeNode.java
index ff831d9..fbadebe 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/DeviceMergeNode.java
@@ -1,7 +1,8 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
 import java.util.List;
 import java.util.Map;
@@ -15,7 +16,7 @@ import java.util.Map;
  *
  * Children type: [TimeJoinOperator]
  */
-public class DeviceMergeNode extends PlanNode<TsBlock> {
+public class DeviceMergeNode extends ProcessNode<TsBlock> {
     // The result output order that this operator
     private TraversalOrder mergeOrder;
 
@@ -24,17 +25,4 @@ public class DeviceMergeNode extends PlanNode<TsBlock> {
 
     // The map from deviceName to corresponding query result operator responsible for that device.
     private Map<String, TimeJoinNode> upstreamMap;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    // If the Tablet from TimeJoinOperator has n columns, the output of DeviceMergeOperator will contain n+1 columns where
-    // the additional column is `deviceName`
-    // And, the `alignedByDevice` in the TabletMetadata will be `true`
-    @Override
-    public TsBlock getNextBatch() {
-        return null;
-    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FillNode.java
similarity index 58%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FillNode.java
index a8f0076..8b08640 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FillNode.java
@@ -1,25 +1,16 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.FillPolicy;
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
 /**
  * FillOperator is used to fill the empty field in one row.
  *
  * Children type: [All the operators whose result set is Tablet]
  */
-public class FillNode extends PlanNode<TsBlock> {
+public class FillNode extends ProcessNode<TsBlock> {
 
     // The policy to discard the result from upstream operator
     private FillPolicy fillPolicy;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public TsBlock getNextBatch() {
-        return null;
-    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FilterNode.java
similarity index 62%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FilterNode.java
index 0bf0ad4..a579d4e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FilterNode.java
@@ -1,6 +1,7 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
 
 /**
@@ -9,18 +10,8 @@ import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
  *
  * Children type: [All the operators whose result set is Tablet]
  */
-public class FilterExecNode extends PlanNode<TsBlock> {
+public class FilterNode extends ProcessNode<TsBlock> {
 
     // The filter
     private FilterOperator rowFilter;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public TsBlock getNextBatch() {
-        return null;
-    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/GroupByLevelNode.java
similarity index 79%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/GroupByLevelNode.java
index 38448d5..2007900 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/GroupByLevelNode.java
@@ -1,8 +1,9 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
 import org.apache.iotdb.cluster.query.distribution.common.LevelBucketInfo;
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
 /**
  * This operator is responsible for the final aggregation merge operation.
@@ -13,7 +14,7 @@ import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
  *
  * Children type: [SeriesAggregateOperator]
  */
-public class GroupByLevelNode extends PlanNode<TsBlock> {
+public class GroupByLevelNode extends ProcessNode<TsBlock> {
 
     // All the buckets that the SeriesBatchAggInfo from upstream will be divided into.
     private LevelBucketInfo bucketInfo;
@@ -21,14 +22,4 @@ public class GroupByLevelNode extends PlanNode<TsBlock> {
     // The parameter of `group by time`
     // The GroupByLevelOperator also need GroupByTimeParameter
     private GroupByTimeParameter groupByTimeParameter;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public TsBlock getNextBatch() {
-        return null;
-    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/LimitNode.java
similarity index 52%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/LimitNode.java
index 875f8c4..e675086 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/LimitNode.java
@@ -1,24 +1,15 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
 /**
  * LimitOperator is used to select top n result. It uses the default order of upstream operators
  *
  * Children type: [All the operators whose result set is Tablet]
  */
-public class LimitNode extends PlanNode<TsBlock> {
+public class LimitNode extends ProcessNode<TsBlock> {
 
     // The limit count
     private int limit;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public TsBlock getNextBatch() {
-        return null;
-    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/OffsetNode.java
similarity index 54%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/OffsetNode.java
index 6de44c5..78ac9eb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/OffsetNode.java
@@ -1,24 +1,15 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
 /**
  * OffsetOperator is used to skip top n result from upstream operators. It uses the default order of upstream operators
  *
  * Children type: [All the operators whose result set is Tablet]
  */
-public class OffsetNode extends PlanNode<TsBlock> {
+public class OffsetNode extends ProcessNode<TsBlock> {
 
     // The limit count
     private int offset;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public TsBlock getNextBatch() {
-        return null;
-    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/ProcessNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/ProcessNode.java
new file mode 100644
index 0000000..e7bffae
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/ProcessNode.java
@@ -0,0 +1,6 @@
+package org.apache.iotdb.cluster.query.distribution.plan.process;
+
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+
+public class ProcessNode<T> extends PlanNode<T> {
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SeriesAggregateNode.java
similarity index 55%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SeriesAggregateNode.java
index 62fb223..46ef991 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SeriesAggregateNode.java
@@ -1,7 +1,11 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
 import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchAggInfo;
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
 
 /**
  * SeriesAggregateOperator is responsible to do the aggregation calculation for one series.
@@ -11,7 +15,7 @@ import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchAggInfo;
  *
  * Children type: [SeriesScanOperator]
  */
-public class SeriesAggregateNode extends PlanNode<SeriesBatchAggInfo> {
+public class SeriesAggregateNode extends ProcessNode<TsBlock> {
 
     // The parameter of `group by time`
     // Its value will be null if there is no `group by time` clause,
@@ -21,13 +25,12 @@ public class SeriesAggregateNode extends PlanNode<SeriesBatchAggInfo> {
     // We use a String to indicate the parameter temporarily
     private String aggregationFunc;
 
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public SeriesBatchAggInfo getNextBatch() {
+    // This method will only be invoked by SeriesAggregateOperator
+    // It will return the statistics of the series in given time range
+    // When calculate the statistics, the operator should use the most optimized way to do that. In
+    // other words, using
+    // raw data is the final way to do that.
+    public Statistics<?> getNextStatisticBetween(TimeRange timeRange) {
         return null;
     }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SortNode.java
similarity index 57%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SortNode.java
index 8bcc520..a718247 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SortNode.java
@@ -1,23 +1,15 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
 /**
  * In general, the parameter in sortOperator should be pushed down to the upstream operators.
  * In our optimized logical query plan, the sortOperator should not appear.
  */
-public class SortNode extends PlanNode<TsBlock> {
+public class SortNode extends ProcessNode<TsBlock> {
 
     private TraversalOrder sortOrder;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public TsBlock getNextBatch() {
-        return null;
-    }
+    
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/TimeJoinNode.java
similarity index 76%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/TimeJoinNode.java
index b293a13..08a7617 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/TimeJoinNode.java
@@ -1,8 +1,9 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
 import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
 /**
  * TimeJoinOperator is responsible for join two or more series.
@@ -11,7 +12,7 @@ import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
  *
  * Children type: [SeriesScanOperator]
  */
-public class TimeJoinNode extends PlanNode<TsBlock> {
+public class TimeJoinNode extends ProcessNode<TsBlock> {
 
     // This parameter indicates the order when executing multiway merge sort.
     private TraversalOrder mergeOrder;
@@ -20,14 +21,4 @@ public class TimeJoinNode extends PlanNode<TsBlock> {
     // The without policy is able to be push down to the TimeJoinOperator because we can know whether a row contains
     // null or not in this operator the situation won't be changed by the downstream operators.
     private WithoutPolicy withoutPolicy;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public TsBlock getNextBatch() {
-        return null;
-    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/WithoutNode.java
similarity index 60%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/WithoutNode.java
index c58a367..b3f1d9e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/WithoutNode.java
@@ -1,25 +1,16 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
 /**
  * WithoutOperator is used to discard specific result from upstream operators.
  *
  * Children type: [All the operators whose result set is Tablet]
  */
-public class WithoutNode extends PlanNode<TsBlock> {
+public class WithoutNode extends ProcessNode<TsBlock> {
 
     // The policy to discard the result from upstream operator
     private WithoutPolicy discardPolicy;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public TsBlock getNextBatch() {
-        return null;
-    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/ThriftSinkOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/CsvSinkNode.java
similarity index 70%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/ThriftSinkOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/CsvSinkNode.java
index c9de077..3e9dc4c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/ThriftSinkOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/CsvSinkNode.java
@@ -17,25 +17,13 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.query.distribution.operator.sink;
+package org.apache.iotdb.cluster.query.distribution.plan.sink;
 
 import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
 
-public class ThriftSinkOperator extends SinkOperator<SeriesBatchData> {
-
+public class CsvSinkNode extends SinkNode<SeriesBatchData> {
   @Override
-  public void close() throws Exception {}
+  public void close() throws Exception {
 
-  @Override
-  public boolean hasNext() {
-    return false;
   }
-
-  @Override
-  public SeriesBatchData getNextBatch() {
-    return null;
-  }
-
-  @Override
-  public void open() throws Exception {}
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SourceOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/SinkNode.java
similarity index 70%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SourceOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/SinkNode.java
index 304cafe..a94a0ff 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SourceOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/SinkNode.java
@@ -17,12 +17,11 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.query.distribution.operator.source;
+package org.apache.iotdb.cluster.query.distribution.plan.sink;
 
-import org.apache.iotdb.cluster.query.distribution.operator.ExecutableOperator;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
-// 区分不同的数据源,可拓展。
-public abstract class SourceOperator<T> extends ExecutableOperator<T> implements AutoCloseable {
+// 构建与客户端的联系。
+public abstract class SinkNode<T> extends PlanNode<T> implements AutoCloseable {
 
-  public abstract void open() throws Exception;
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/RestSinkOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/ThriftSinkNode.java
similarity index 74%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/RestSinkOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/ThriftSinkNode.java
index d8e6588..beed5a0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/RestSinkOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/ThriftSinkNode.java
@@ -17,25 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.query.distribution.operator.sink;
+package org.apache.iotdb.cluster.query.distribution.plan.sink;
 
 import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
 
-public class RestSinkOperator extends SinkOperator<SeriesBatchData> {
+public class ThriftSinkNode extends SinkNode<SeriesBatchData> {
 
   @Override
   public void close() throws Exception {}
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public SeriesBatchData getNextBatch() {
-    return null;
-  }
-
-  @Override
-  public void open() throws Exception {}
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/CsvSinkOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/CsvSourceNode.java
similarity index 78%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/CsvSinkOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/CsvSourceNode.java
index 17bba65..85f402a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/CsvSinkOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/CsvSourceNode.java
@@ -17,25 +17,15 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.query.distribution.operator.sink;
+package org.apache.iotdb.cluster.query.distribution.plan.source;
 
 import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
 
-public class CsvSinkOperator extends SinkOperator<SeriesBatchData> {
+public class CsvSourceNode extends SourceNode<SeriesBatchData> {
 
   @Override
   public void close() throws Exception {}
 
   @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public SeriesBatchData getNextBatch() {
-    return null;
-  }
-
-  @Override
   public void open() throws Exception {}
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesScanNode.java
similarity index 69%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesScanNode.java
index fd67f9e..7ff8978 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesScanNode.java
@@ -1,7 +1,7 @@
-package org.apache.iotdb.cluster.query.distribution.operator.source;
+package org.apache.iotdb.cluster.query.distribution.plan.source;
 
-import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
 import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
@@ -18,7 +18,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
  *
  * <p>Children type: []
  */
-public class SeriesScanOperator extends SourceOperator<SeriesBatchData> {
+public class SeriesScanNode extends SourceNode<TsBlock> {
 
   // The path of the target series which will be scanned.
   private Path seriesPath;
@@ -38,25 +38,6 @@ public class SeriesScanOperator extends SourceOperator<SeriesBatchData> {
   private int offset;
 
   @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public SeriesBatchData getNextBatch() {
-    return null;
-  }
-
-  // This method will only be invoked by SeriesAggregateOperator
-  // It will return the statistics of the series in given time range
-  // When calculate the statistics, the operator should use the most optimized way to do that. In
-  // other words, using
-  // raw data is the final way to do that.
-  public Statistics<?> getNextStatisticBetween(TimeRange timeRange) {
-    return null;
-  }
-
-  @Override
   public void close() throws Exception {}
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/SinkOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SourceNode.java
similarity index 75%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/SinkOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SourceNode.java
index 29c906d..ed0e39a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/SinkOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SourceNode.java
@@ -17,12 +17,11 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.query.distribution.operator.sink;
+package org.apache.iotdb.cluster.query.distribution.plan.source;
 
-import org.apache.iotdb.cluster.query.distribution.operator.ExecutableOperator;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
-// 构建与客户端的联系。
-public abstract class SinkOperator<T> extends ExecutableOperator<T> implements AutoCloseable {
+public abstract class SourceNode<T> extends PlanNode<T> implements AutoCloseable{
 
   public abstract void open() throws Exception;
 }

[iotdb] 02/03: Merge branch 'suyurong/operator-design' into xingtanzjr/operator-design

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/operator-design
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ed432ec7ff50845c464315710ce0d1c0cb3c848c
Merge: 1d7b20e bdbd38e
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu Mar 10 19:22:41 2022 +0800

    Merge branch 'suyurong/operator-design' into xingtanzjr/operator-design

 .../distribution/operator/ExecutableOperator.java  | 34 ++++++++++++
 .../operator/internal/DeviceMergeOperator.java     | 43 +++++++++++++++
 .../operator/internal/FillOperator.java            | 25 +++++++++
 .../operator/internal/FilterInternalOperator.java  | 26 +++++++++
 .../operator/internal/GroupByLevelOperator.java    | 34 ++++++++++++
 .../operator/internal/InternalOperator.java        |  7 +++
 .../operator/internal/LimitOperator.java           | 24 ++++++++
 .../operator/internal/OffsetOperator.java          | 25 +++++++++
 .../operator/internal/SeriesAggregateOperator.java | 33 +++++++++++
 .../operator/internal/SortOperator.java            | 23 ++++++++
 .../operator/internal/TimeJoinOperator.java        | 33 +++++++++++
 .../operator/internal/WithoutOperator.java         | 25 +++++++++
 .../operator/sink/CsvSinkOperator.java             | 41 ++++++++++++++
 .../operator/sink/RestSinkOperator.java            | 41 ++++++++++++++
 .../distribution/operator/sink/SinkOperator.java   | 28 ++++++++++
 .../operator/sink/ThriftSinkOperator.java          | 41 ++++++++++++++
 .../operator/source/CsvSourceOperator.java         | 41 ++++++++++++++
 .../operator/source/SeriesScanOperator.java        | 64 ++++++++++++++++++++++
 .../operator/source/SourceOperator.java            | 28 ++++++++++
 19 files changed, 616 insertions(+)