You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/03/14 05:53:32 UTC

[iotdb] branch xingtanzjr/mpp-query-basis created (now b06892c)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch xingtanzjr/mpp-query-basis
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at b06892c  add RowBasedSeriesAggregateNode

This branch includes the following new commits:

     new bdaf57c  add basic operators
     new 4800140  complete DeviceMergeOperator and TimeJoinOperator
     new a95ca1e  complete raw data query operators design
     new 7e68943  complete basic design of all operators
     new 12d7af3  move query distribution folder to cluster
     new e11bd63  rename operator to node
     new f02e74e  design of source, sink and internal operators
     new d32aa16  design of source, sink and internal operators
     new e6d78c0  align class name
     new fe5d9d2  complete basic definition of all nodes
     new b06892c  add RowBasedSeriesAggregateNode

The 11 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 02/11: complete DeviceMergeOperator and TimeJoinOperator

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/mpp-query-basis
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 48001404b50e58968c3685e0002775dc98517e55
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Mar 1 21:36:12 2022 +0800

    complete DeviceMergeOperator and TimeJoinOperator
---
 .../distribution/common/DeviceMergeOrder.java      |  8 +++++
 .../iotdb/db/query/distribution/common/Tablet.java | 29 ++++++++++++++++
 .../query/distribution/common/TabletMetadata.java  | 21 ++++++++++++
 .../query/distribution/common/WithoutPolicy.java   |  6 ++++
 .../distribution/operator/DeviceMergeOperator.java | 40 ++++++++++++++++++++++
 .../distribution/operator/TimeJoinOperator.java    | 16 +++++++--
 6 files changed, 117 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/DeviceMergeOrder.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/DeviceMergeOrder.java
new file mode 100644
index 0000000..986beac
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/DeviceMergeOrder.java
@@ -0,0 +1,8 @@
+package org.apache.iotdb.db.query.distribution.common;
+
+public enum DeviceMergeOrder {
+    TIMESTAMP_ASC,
+    TIMESTAMP_DESC,
+    DEVICE_NAME_ASC,
+    DEVICE_NAME_DESC,
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/Tablet.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/Tablet.java
new file mode 100644
index 0000000..956ccb0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/Tablet.java
@@ -0,0 +1,29 @@
+package org.apache.iotdb.db.query.distribution.common;
+
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+/**
+ * Intermediate result for most of ExecOperators.
+ * The Tablet contains data from one or more series and constructs them as a row based view
+ * The Tablet constructed with n series has n+1 columns where one column is timestamp and the other n columns are values
+ * from each series.
+ * The Tablet also contains the metadata of owned series.
+ *
+ * TODO: consider the detailed data store model in memory. (using column based or row based ?)
+ */
+public class Tablet {
+
+    private TabletMetadata metadata;
+
+    public boolean hasNext() {
+        return false;
+    }
+
+    public RowRecord getNext() {
+        return null;
+    }
+
+    public TabletMetadata getMetadata() {
+        return metadata;
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/TabletMetadata.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/TabletMetadata.java
new file mode 100644
index 0000000..0a01959
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/TabletMetadata.java
@@ -0,0 +1,21 @@
+package org.apache.iotdb.db.query.distribution.common;
+
+import org.apache.iotdb.tsfile.read.common.Path;
+
+import java.util.List;
+
+public class TabletMetadata {
+    // list of all columns in current Tablet
+    // The column list not only contains the series column, but also contains other column to construct the final result
+    // set such as timestamp and deviceName
+    private List<String> columnList;
+
+    // Indicate whether the result set should be aligned by device. This parameter can be used for downstream operators
+    // when processing data from current Tablet. The RowRecord produced by Tablet with `alignedByDevice = true` will contain
+    // n + 1 fields which are n series field and 1 deviceName field.
+    // For example, when the FilterOperator execute the filter operation, it may need the deviceName field when matching
+    // the series with corresponding column in Tablet
+    //
+    // If alignedByDevice is true, the owned series should belong to one device
+    private boolean alignedByDevice;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/WithoutPolicy.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/WithoutPolicy.java
new file mode 100644
index 0000000..02d1e43
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/WithoutPolicy.java
@@ -0,0 +1,6 @@
+package org.apache.iotdb.db.query.distribution.common;
+
+public enum WithoutPolicy {
+    CONTAINS_NULL,
+    ALL_NULL
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/DeviceMergeOperator.java
new file mode 100644
index 0000000..16099ca
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/DeviceMergeOperator.java
@@ -0,0 +1,40 @@
+package org.apache.iotdb.db.query.distribution.operator;
+
+import org.apache.iotdb.db.query.distribution.common.DeviceMergeOrder;
+import org.apache.iotdb.db.query.distribution.common.Tablet;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DeviceMergeOperator is responsible for constructing a device-based view of a set of series. And output the result with
+ * specific order. The order could be 'order by device' or 'order by timestamp'
+ *
+ * The types of involved devices should be same. If the device contains n series, the device-based view will contain n+2
+ * columns, which are timestamp column, device name column and n value columns of involved series.
+ *
+ * Children type: [TimeJoinOperator]
+ */
+public class DeviceMergeOperator extends ExecOperator<Tablet> {
+    // The result output order that this operator
+    private DeviceMergeOrder mergeOrder;
+
+    // Owned devices
+    private List<String> ownedDeviceNameList;
+
+    // The map from deviceName to corresponding query result operator responsible for that device.
+    private Map<String, TimeJoinOperator> upstreamMap;
+
+    @Override
+    public boolean hasNext() {
+        return false;
+    }
+
+    // If the Tablet from TimeJoinOperator has n columns, the output of DeviceMergeOperator will contain n+1 columns where
+    // the additional column is `deviceName`
+    // And, the `alignedByDevice` in the TabletMetadata will be `true`
+    @Override
+    public Tablet getNextBatch() {
+        return null;
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/TimeJoinOperator.java
index 8361276..dbf1deb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/TimeJoinOperator.java
@@ -1,6 +1,8 @@
 package org.apache.iotdb.db.query.distribution.operator;
 
-import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.db.query.distribution.common.Tablet;
+import org.apache.iotdb.db.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.db.query.distribution.common.WithoutPolicy;
 
 /**
  * TimeJoinOperator is responsible for join two or more series.
@@ -9,7 +11,15 @@ import org.apache.iotdb.tsfile.read.common.RowRecord;
  *
  * Children type: [SeriesScanOperator]
  */
-public class TimeJoinOperator extends ExecOperator<RowRecord> {
+public class TimeJoinOperator extends ExecOperator<Tablet> {
+
+    // This parameter indicates the order when executing multiway merge sort.
+    private TraversalOrder mergeOrder;
+
+    // The policy to decide whether a row should be discarded
+    // The without policy is able to be push down to the TimeJoinOperator because we can know whether a row contains
+    // null or not in this operator the situation won't be changed by the downstream operators.
+    private WithoutPolicy withoutPolicy;
 
     @Override
     public boolean hasNext() {
@@ -17,7 +27,7 @@ public class TimeJoinOperator extends ExecOperator<RowRecord> {
     }
 
     @Override
-    public RowRecord getNextBatch() {
+    public Tablet getNextBatch() {
         return null;
     }
 }

[iotdb] 03/11: complete raw data query operators design

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/mpp-query-basis
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a95ca1e3431b44551124f9ec82e1530fb4e89aff
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Mar 1 21:59:46 2022 +0800

    complete raw data query operators design
---
 .../distribution/common/DeviceMergeOrder.java      |  8 -------
 .../db/query/distribution/common/FillPolicy.java   |  5 +++++
 .../query/distribution/common/TraversalOrder.java  |  6 +++--
 .../distribution/operator/DeviceMergeOperator.java |  4 ++--
 .../query/distribution/operator/FillOperator.java  | 26 ++++++++++++++++++++++
 .../distribution/operator/FilterExecOperator.java  | 26 ++++++++++++++++++++++
 .../query/distribution/operator/LimitOperator.java | 24 ++++++++++++++++++++
 .../distribution/operator/OffsetOperator.java      | 24 ++++++++++++++++++++
 .../distribution/operator/SeriesScanOperator.java  |  7 +++---
 .../query/distribution/operator/SortOperator.java  | 23 +++++++++++++++++++
 .../distribution/operator/WithoutOperator.java     | 25 +++++++++++++++++++++
 11 files changed, 163 insertions(+), 15 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/DeviceMergeOrder.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/DeviceMergeOrder.java
deleted file mode 100644
index 986beac..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/DeviceMergeOrder.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package org.apache.iotdb.db.query.distribution.common;
-
-public enum DeviceMergeOrder {
-    TIMESTAMP_ASC,
-    TIMESTAMP_DESC,
-    DEVICE_NAME_ASC,
-    DEVICE_NAME_DESC,
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/FillPolicy.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/FillPolicy.java
new file mode 100644
index 0000000..ba59bad
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/FillPolicy.java
@@ -0,0 +1,5 @@
+package org.apache.iotdb.db.query.distribution.common;
+
+public enum FillPolicy {
+    PREVIOUS,
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/TraversalOrder.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/TraversalOrder.java
index 6b00372..3b1bc92 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/TraversalOrder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/TraversalOrder.java
@@ -4,6 +4,8 @@ package org.apache.iotdb.db.query.distribution.common;
  * The traversal order for operators by timestamp
  */
 public enum TraversalOrder {
-    ASC,
-    DESC
+    TIMESTAMP_ASC,
+    TIMESTAMP_DESC,
+    DEVICE_NAME_ASC,
+    DEVICE_NAME_DESC,
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/DeviceMergeOperator.java
index 16099ca..45e52e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/DeviceMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/DeviceMergeOperator.java
@@ -1,7 +1,7 @@
 package org.apache.iotdb.db.query.distribution.operator;
 
-import org.apache.iotdb.db.query.distribution.common.DeviceMergeOrder;
 import org.apache.iotdb.db.query.distribution.common.Tablet;
+import org.apache.iotdb.db.query.distribution.common.TraversalOrder;
 
 import java.util.List;
 import java.util.Map;
@@ -17,7 +17,7 @@ import java.util.Map;
  */
 public class DeviceMergeOperator extends ExecOperator<Tablet> {
     // The result output order that this operator
-    private DeviceMergeOrder mergeOrder;
+    private TraversalOrder mergeOrder;
 
     // Owned devices
     private List<String> ownedDeviceNameList;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/FillOperator.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/FillOperator.java
new file mode 100644
index 0000000..306c2b9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/FillOperator.java
@@ -0,0 +1,26 @@
+package org.apache.iotdb.db.query.distribution.operator;
+
+import org.apache.iotdb.db.query.distribution.common.FillPolicy;
+import org.apache.iotdb.db.query.distribution.common.Tablet;
+import org.apache.iotdb.db.query.distribution.common.WithoutPolicy;
+
+/**
+ * FillOperator is used to fill the empty field in one row.
+ *
+ * Children type: [All the operators whose result set is Tablet]
+ */
+public class FillOperator extends ExecOperator<Tablet> {
+
+    // The policy to discard the result from upstream operator
+    private FillPolicy fillPolicy;
+
+    @Override
+    public boolean hasNext() {
+        return false;
+    }
+
+    @Override
+    public Tablet getNextBatch() {
+        return null;
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/FilterExecOperator.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/FilterExecOperator.java
new file mode 100644
index 0000000..65d2b8e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/FilterExecOperator.java
@@ -0,0 +1,26 @@
+package org.apache.iotdb.db.query.distribution.operator;
+
+import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
+import org.apache.iotdb.db.query.distribution.common.Tablet;
+
+/**
+ * (We use FilterExecOperator to distinguish itself from the FilterOperator used in single-node IoTDB)
+ * The FilterExecOperator is responsible to filter the RowRecord from Tablet.
+ *
+ * Children type: [All the operators whose result set is Tablet]
+ */
+public class FilterExecOperator extends ExecOperator<Tablet> {
+
+    // The filter
+    private FilterOperator rowFilter;
+
+    @Override
+    public boolean hasNext() {
+        return false;
+    }
+
+    @Override
+    public Tablet getNextBatch() {
+        return null;
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/LimitOperator.java
new file mode 100644
index 0000000..332c878
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/LimitOperator.java
@@ -0,0 +1,24 @@
+package org.apache.iotdb.db.query.distribution.operator;
+
+import org.apache.iotdb.db.query.distribution.common.Tablet;
+
+/**
+ * LimitOperator is used to select top n result. It uses the default order of upstream operators
+ *
+ * Children type: [All the operators whose result set is Tablet]
+ */
+public class LimitOperator extends ExecOperator<Tablet> {
+
+    // The limit count
+    private int limit;
+
+    @Override
+    public boolean hasNext() {
+        return false;
+    }
+
+    @Override
+    public Tablet getNextBatch() {
+        return null;
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/OffsetOperator.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/OffsetOperator.java
new file mode 100644
index 0000000..c8c0ec4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/OffsetOperator.java
@@ -0,0 +1,24 @@
+package org.apache.iotdb.db.query.distribution.operator;
+
+import org.apache.iotdb.db.query.distribution.common.Tablet;
+
+/**
+ * OffsetOperator is used to skip top n result from upstream operators. It uses the default order of upstream operators
+ *
+ * Children type: [All the operators whose result set is Tablet]
+ */
+public class OffsetOperator extends ExecOperator<Tablet> {
+
+    // The limit count
+    private int offset;
+
+    @Override
+    public boolean hasNext() {
+        return false;
+    }
+
+    @Override
+    public Tablet getNextBatch() {
+        return null;
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SeriesScanOperator.java
index ebbc03d..11527a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SeriesScanOperator.java
@@ -21,9 +21,10 @@ public class SeriesScanOperator extends ExecOperator<SeriesBatchData> {
     // The path of the target series which will be scanned.
     private Path seriesPath;
 
-    // The order to traverse the series by timestamp.
-    // The default order is ASC, which means "order by timestamp asc"
-    private TraversalOrder scanOrder = TraversalOrder.ASC;
+    // The order to traverse the data.
+    // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
+    // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
+    private TraversalOrder scanOrder = TraversalOrder.TIMESTAMP_ASC;
 
     // Filter data in current series.
     private Filter filter;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SortOperator.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SortOperator.java
new file mode 100644
index 0000000..c1e2b0a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SortOperator.java
@@ -0,0 +1,23 @@
+package org.apache.iotdb.db.query.distribution.operator;
+
+import org.apache.iotdb.db.query.distribution.common.Tablet;
+import org.apache.iotdb.db.query.distribution.common.TraversalOrder;
+
+/**
+ * In general, the parameter in sortOperator should be pushed down to the upstream operators.
+ * In our optimized logical query plan, the sortOperator should not appear.
+ */
+public class SortOperator extends ExecOperator<Tablet>{
+
+    private TraversalOrder sortOrder;
+
+    @Override
+    public boolean hasNext() {
+        return false;
+    }
+
+    @Override
+    public Tablet getNextBatch() {
+        return null;
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/WithoutOperator.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/WithoutOperator.java
new file mode 100644
index 0000000..75250b9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/WithoutOperator.java
@@ -0,0 +1,25 @@
+package org.apache.iotdb.db.query.distribution.operator;
+
+import org.apache.iotdb.db.query.distribution.common.Tablet;
+import org.apache.iotdb.db.query.distribution.common.WithoutPolicy;
+
+/**
+ * WithoutOperator is used to discard specific result from upstream operators.
+ *
+ * Children type: [All the operators whose result set is Tablet]
+ */
+public class WithoutOperator extends ExecOperator<Tablet> {
+
+    // The policy to discard the result from upstream operator
+    private WithoutPolicy discardPolicy;
+
+    @Override
+    public boolean hasNext() {
+        return false;
+    }
+
+    @Override
+    public Tablet getNextBatch() {
+        return null;
+    }
+}

[iotdb] 08/11: design of source, sink and internal operators

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/mpp-query-basis
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d32aa168d108eaa4fd6a6784eaab92b97f8408b3
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Mar 8 23:40:05 2022 +0800

    design of source, sink and internal operators
---
 .../distribution/operator/ExecutableOperator.java  | 34 ++++++++++++++++++
 .../operator/internal/DeviceMergeOperator.java     |  2 +-
 .../operator/internal/FillOperator.java            |  2 +-
 .../operator/internal/FilterInternalOperator.java  |  2 +-
 .../operator/internal/GroupByLevelOperator.java    |  2 +-
 .../operator/internal/InternalOperator.java        | 20 +++--------
 .../operator/internal/LimitOperator.java           |  2 +-
 .../operator/internal/OffsetOperator.java          |  2 +-
 .../operator/internal/SeriesAggregateOperator.java |  2 +-
 .../operator/internal/SortOperator.java            |  2 +-
 .../operator/internal/TimeJoinOperator.java        |  2 +-
 .../operator/internal/WithoutOperator.java         |  2 +-
 .../operator/sink/CsvSinkOperator.java             | 41 ++++++++++++++++++++++
 .../operator/sink/RestSinkOperator.java            | 41 ++++++++++++++++++++++
 .../distribution/operator/sink/SinkOperator.java   | 28 +++++++++++++++
 .../operator/sink/ThriftSinkOperator.java          | 41 ++++++++++++++++++++++
 .../operator/source/CsvSourceOperator.java         | 41 ++++++++++++++++++++++
 .../operator/source/SeriesScanOperator.java        | 10 ++++--
 .../operator/source/SourceOperator.java            | 28 +++++++++++++++
 19 files changed, 277 insertions(+), 27 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecutableOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecutableOperator.java
new file mode 100644
index 0000000..00a7a6a
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecutableOperator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.TreeNode;
+
+public abstract class ExecutableOperator<T> extends TreeNode<ExecutableOperator<?>> {
+
+  // Resource control, runtime control...
+
+  // Judge whether current operator has more result
+  public abstract boolean hasNext();
+
+  // Get next result batch of this operator
+  // Return null if there is no more result to return
+  public abstract T getNextBatch();
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java
index b912994..d4bbced 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator.internal;
 
 import org.apache.iotdb.cluster.query.distribution.common.Tablet;
 import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java
index 8ee610a..595a27e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator.internal;
 
 import org.apache.iotdb.cluster.query.distribution.common.FillPolicy;
 import org.apache.iotdb.cluster.query.distribution.common.Tablet;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java
index ecbda0f..f41d768 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator.internal;
 
 import org.apache.iotdb.cluster.query.distribution.common.Tablet;
 import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java
index c6e00d2..9cab1c0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator.internal;
 
 import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
 import org.apache.iotdb.cluster.query.distribution.common.LevelBucketInfo;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java
index 0692381..a5b8b5b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java
@@ -1,17 +1,7 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator.internal;
 
-import org.apache.iotdb.cluster.query.distribution.common.TreeNode;
+import org.apache.iotdb.cluster.query.distribution.operator.ExecutableOperator;
 
-/**
- * @author xingtanzjr The base class of query executable operators, which is used to compose logical
- *     query plan. TODO: consider how to restrict the children type for each type of ExecOperator
- */
-public abstract class InternalOperator<T> extends TreeNode<InternalOperator<?>> {
-
-  // Judge whether current operator has more result
-  public abstract boolean hasNext();
-
-  // Get next result batch of this operator
-  // Return null if there is no more result to return
-  public abstract T getNextBatch();
-}
+// 从 buffer 拉数据
+// 推送到下游的逻辑
+public abstract class InternalOperator<T> extends ExecutableOperator<T> {}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java
index 9075f64..1330eee 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator.internal;
 
 import org.apache.iotdb.cluster.query.distribution.common.Tablet;
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java
index c6f79ac..aa82d8c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator.internal;
 
 import org.apache.iotdb.cluster.query.distribution.common.Tablet;
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java
index 96a5c86..889c50b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator.internal;
 
 import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
 import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchAggInfo;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java
index fa83be7..d2aee44 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator.internal;
 
 import org.apache.iotdb.cluster.query.distribution.common.Tablet;
 import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java
index c4ae6f3..bd7cf37 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator.internal;
 
 import org.apache.iotdb.cluster.query.distribution.common.Tablet;
 import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java
index 535a3d6..f678a2c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator.internal;
 
 import org.apache.iotdb.cluster.query.distribution.common.Tablet;
 import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/CsvSinkOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/CsvSinkOperator.java
new file mode 100644
index 0000000..17bba65
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/CsvSinkOperator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.query.distribution.operator.sink;
+
+import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
+
+public class CsvSinkOperator extends SinkOperator<SeriesBatchData> {
+
+  @Override
+  public void close() throws Exception {}
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public SeriesBatchData getNextBatch() {
+    return null;
+  }
+
+  @Override
+  public void open() throws Exception {}
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/RestSinkOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/RestSinkOperator.java
new file mode 100644
index 0000000..d8e6588
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/RestSinkOperator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.query.distribution.operator.sink;
+
+import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
+
+public class RestSinkOperator extends SinkOperator<SeriesBatchData> {
+
+  @Override
+  public void close() throws Exception {}
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public SeriesBatchData getNextBatch() {
+    return null;
+  }
+
+  @Override
+  public void open() throws Exception {}
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/SinkOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/SinkOperator.java
new file mode 100644
index 0000000..29c906d
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/SinkOperator.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.query.distribution.operator.sink;
+
+import org.apache.iotdb.cluster.query.distribution.operator.ExecutableOperator;
+
+// 构建与客户端的联系。
+public abstract class SinkOperator<T> extends ExecutableOperator<T> implements AutoCloseable {
+
+  public abstract void open() throws Exception;
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/ThriftSinkOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/ThriftSinkOperator.java
new file mode 100644
index 0000000..c9de077
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/ThriftSinkOperator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.query.distribution.operator.sink;
+
+import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
+
+public class ThriftSinkOperator extends SinkOperator<SeriesBatchData> {
+
+  @Override
+  public void close() throws Exception {}
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public SeriesBatchData getNextBatch() {
+    return null;
+  }
+
+  @Override
+  public void open() throws Exception {}
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/CsvSourceOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/CsvSourceOperator.java
new file mode 100644
index 0000000..ff82128
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/CsvSourceOperator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.query.distribution.operator.source;
+
+import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
+
+public class CsvSourceOperator extends SourceOperator<SeriesBatchData> {
+
+  @Override
+  public void close() throws Exception {}
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public SeriesBatchData getNextBatch() {
+    return null;
+  }
+
+  @Override
+  public void open() throws Exception {}
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java
index 9cd6043..fd67f9e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator.source;
 
 import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
 import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
@@ -18,7 +18,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
  *
  * <p>Children type: []
  */
-public class SeriesScanOperator extends InternalOperator<SeriesBatchData> {
+public class SeriesScanOperator extends SourceOperator<SeriesBatchData> {
 
   // The path of the target series which will be scanned.
   private Path seriesPath;
@@ -55,4 +55,10 @@ public class SeriesScanOperator extends InternalOperator<SeriesBatchData> {
   public Statistics<?> getNextStatisticBetween(TimeRange timeRange) {
     return null;
   }
+
+  @Override
+  public void close() throws Exception {}
+
+  @Override
+  public void open() throws Exception {}
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SourceOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SourceOperator.java
new file mode 100644
index 0000000..304cafe
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SourceOperator.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.query.distribution.operator.source;
+
+import org.apache.iotdb.cluster.query.distribution.operator.ExecutableOperator;
+
+// 区分不同的数据源,可拓展。
+public abstract class SourceOperator<T> extends ExecutableOperator<T> implements AutoCloseable {
+
+  public abstract void open() throws Exception;
+}

[iotdb] 09/11: align class name

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/mpp-query-basis
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e6d78c0a621eebac0f21fd976e0656d58c5242fc
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu Mar 10 19:40:12 2022 +0800

    align class name
---
 .../distribution/operator/ExecutableOperator.java  | 34 -------------
 .../distribution/operator/SeriesScanNode.java      | 57 ----------------------
 .../operator/internal/DeviceMergeOperator.java     | 43 ----------------
 .../operator/internal/FillOperator.java            | 25 ----------
 .../operator/internal/FilterInternalOperator.java  | 26 ----------
 .../operator/internal/GroupByLevelOperator.java    | 34 -------------
 .../operator/internal/InternalOperator.java        |  7 ---
 .../operator/internal/LimitOperator.java           | 24 ---------
 .../operator/internal/OffsetOperator.java          | 25 ----------
 .../operator/internal/SeriesAggregateOperator.java | 33 -------------
 .../operator/internal/SortOperator.java            | 23 ---------
 .../operator/internal/TimeJoinOperator.java        | 33 -------------
 .../operator/internal/WithoutOperator.java         | 25 ----------
 .../operator/source/CsvSourceOperator.java         | 41 ----------------
 .../distribution/{operator => plan}/PlanNode.java  |  8 +--
 .../process}/DeviceMergeNode.java                  | 18 ++-----
 .../{operator => plan/process}/FillNode.java       | 15 ++----
 .../process/FilterNode.java}                       | 15 ++----
 .../process}/GroupByLevelNode.java                 | 15 ++----
 .../{operator => plan/process}/LimitNode.java      | 15 ++----
 .../{operator => plan/process}/OffsetNode.java     | 15 ++----
 .../distribution/plan/process/ProcessNode.java     |  6 +++
 .../process}/SeriesAggregateNode.java              | 21 ++++----
 .../{operator => plan/process}/SortNode.java       | 16 ++----
 .../{operator => plan/process}/TimeJoinNode.java   | 15 ++----
 .../{operator => plan/process}/WithoutNode.java    | 15 ++----
 .../sink/CsvSinkNode.java}                         | 18 ++-----
 .../sink/SinkNode.java}                            |  9 ++--
 .../sink/ThriftSinkNode.java}                      | 17 +------
 .../source/CsvSourceNode.java}                     | 14 +-----
 .../source/SeriesScanNode.java}                    | 25 ++--------
 .../source/SourceNode.java}                        |  7 ++-
 32 files changed, 64 insertions(+), 630 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecutableOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecutableOperator.java
deleted file mode 100644
index 00a7a6a..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecutableOperator.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.TreeNode;
-
-public abstract class ExecutableOperator<T> extends TreeNode<ExecutableOperator<?>> {
-
-  // Resource control, runtime control...
-
-  // Judge whether current operator has more result
-  public abstract boolean hasNext();
-
-  // Get next result batch of this operator
-  // Return null if there is no more result to return
-  public abstract T getNextBatch();
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanNode.java
deleted file mode 100644
index 1c8ac3d..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanNode.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-
-/**
- * SeriesScanOperator is responsible for read data and pre-aggregated statistic for a specific series.
- * When reading data, the SeriesScanOperator can read the raw data batch by batch. And also, it can leverage
- * the filter and other info to decrease the result set.
- * Besides, the SeriesScanOperator can read the pre-aggregated statistic in TsFile. And return the statistic with
- * a fix time range one by one. If the time range is narrower than the smallest pre-aggregated statistic or has overlap
- * with pre-aggregated statistic, the SeriesScanOperator will read the raw data and calculate the aggregation result for
- * specific time range.
- *
- * Children type: []
- */
-public class SeriesScanNode extends PlanNode<SeriesBatchData> {
-
-    // The path of the target series which will be scanned.
-    private Path seriesPath;
-
-    // The order to traverse the data.
-    // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
-    // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
-    private TraversalOrder scanOrder = TraversalOrder.TIMESTAMP_ASC;
-
-    // Filter data in current series.
-    private Filter filter;
-
-    // Limit for result set. The default value is -1, which means no limit
-    private int limit = -1;
-
-    // offset for result set. The default value is 0
-    private int offset;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public SeriesBatchData getNextBatch() {
-        return null;
-    }
-
-    // This method will only be invoked by SeriesAggregateOperator
-    // It will return the statistics of the series in given time range
-    // When calculate the statistics, the operator should use the most optimized way to do that. In other words, using
-    // raw data is the final way to do that.
-    public Statistics<?> getNextStatisticBetween(TimeRange timeRange) {
-        return null;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java
deleted file mode 100644
index d4bbced..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * DeviceMergeOperator is responsible for constructing a device-based view of a set of series. And
- * output the result with specific order. The order could be 'order by device' or 'order by
- * timestamp'
- *
- * <p>The types of involved devices should be same. If the device contains n series, the
- * device-based view will contain n+2 columns, which are timestamp column, device name column and n
- * value columns of involved series.
- *
- * <p>Children type: [TimeJoinOperator]
- */
-public class DeviceMergeOperator extends InternalOperator<Tablet> {
-  // The result output order that this operator
-  private TraversalOrder mergeOrder;
-
-  // Owned devices
-  private List<String> ownedDeviceNameList;
-
-  // The map from deviceName to corresponding query result operator responsible for that device.
-  private Map<String, TimeJoinOperator> upstreamMap;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  // If the Tablet from TimeJoinOperator has n columns, the output of DeviceMergeOperator will
-  // contain n+1 columns where
-  // the additional column is `deviceName`
-  // And, the `alignedByDevice` in the TabletMetadata will be `true`
-  @Override
-  public Tablet getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java
deleted file mode 100644
index 595a27e..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.FillPolicy;
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-
-/**
- * FillOperator is used to fill the empty field in one row.
- *
- * <p>Children type: [All the operators whose result set is Tablet]
- */
-public class FillOperator extends InternalOperator<Tablet> {
-
-  // The policy to discard the result from upstream operator
-  private FillPolicy fillPolicy;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public Tablet getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java
deleted file mode 100644
index f41d768..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
-
-/**
- * (We use FilterExecOperator to distinguish itself from the FilterOperator used in single-node
- * IoTDB) The FilterExecOperator is responsible to filter the RowRecord from Tablet.
- *
- * <p>Children type: [All the operators whose result set is Tablet]
- */
-public class FilterInternalOperator extends InternalOperator<Tablet> {
-
-  // The filter
-  private FilterOperator rowFilter;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public Tablet getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java
deleted file mode 100644
index 9cab1c0..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
-import org.apache.iotdb.cluster.query.distribution.common.LevelBucketInfo;
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-
-/**
- * This operator is responsible for the final aggregation merge operation. It will arrange the data
- * by time range firstly. And inside each time range, the data from same measurement and different
- * devices will be rolled up by corresponding level into different buckets. If the bucketInfo is
- * empty, the data from `same measurement and different devices` won't be rolled up. If the
- * groupByTimeParameter is null, the data won't be split by time range.
- *
- * <p>Children type: [SeriesAggregateOperator]
- */
-public class GroupByLevelOperator extends InternalOperator<Tablet> {
-
-  // All the buckets that the SeriesBatchAggInfo from upstream will be divided into.
-  private LevelBucketInfo bucketInfo;
-
-  // The parameter of `group by time`
-  // The GroupByLevelOperator also need GroupByTimeParameter
-  private GroupByTimeParameter groupByTimeParameter;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public Tablet getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java
deleted file mode 100644
index a5b8b5b..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.operator.ExecutableOperator;
-
-// 从 buffer 拉数据
-// 推送到下游的逻辑
-public abstract class InternalOperator<T> extends ExecutableOperator<T> {}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java
deleted file mode 100644
index 1330eee..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-
-/**
- * LimitOperator is used to select top n result. It uses the default order of upstream operators
- *
- * <p>Children type: [All the operators whose result set is Tablet]
- */
-public class LimitOperator extends InternalOperator<Tablet> {
-
-  // The limit count
-  private int limit;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public Tablet getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java
deleted file mode 100644
index aa82d8c..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-
-/**
- * OffsetOperator is used to skip top n result from upstream operators. It uses the default order of
- * upstream operators
- *
- * <p>Children type: [All the operators whose result set is Tablet]
- */
-public class OffsetOperator extends InternalOperator<Tablet> {
-
-  // The limit count
-  private int offset;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public Tablet getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java
deleted file mode 100644
index 889c50b..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
-import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchAggInfo;
-
-/**
- * SeriesAggregateOperator is responsible to do the aggregation calculation for one series. This
- * operator will split data in one series into many groups by time range and do the aggregation
- * calculation for each group. If there is no split parameter, it will return one result which is
- * the aggregation result of all data in current series.
- *
- * <p>Children type: [SeriesScanOperator]
- */
-public class SeriesAggregateOperator extends InternalOperator<SeriesBatchAggInfo> {
-
-  // The parameter of `group by time`
-  // Its value will be null if there is no `group by time` clause,
-  private GroupByTimeParameter groupByTimeParameter;
-
-  // TODO: need consider how to represent the aggregation function and corresponding implementation
-  // We use a String to indicate the parameter temporarily
-  private String aggregationFunc;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public SeriesBatchAggInfo getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java
deleted file mode 100644
index d2aee44..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
-
-/**
- * In general, the parameter in sortOperator should be pushed down to the upstream operators. In our
- * optimized logical query plan, the sortOperator should not appear.
- */
-public class SortOperator extends InternalOperator<Tablet> {
-
-  private TraversalOrder sortOrder;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public Tablet getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java
deleted file mode 100644
index bd7cf37..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
-import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
-
-/**
- * TimeJoinOperator is responsible for join two or more series. The join algorithm is like outer
- * join by timestamp column. The output result of TimeJoinOperator is sorted by timestamp
- *
- * <p>Children type: [SeriesScanOperator]
- */
-public class TimeJoinOperator extends InternalOperator<Tablet> {
-
-  // This parameter indicates the order when executing multiway merge sort.
-  private TraversalOrder mergeOrder;
-
-  // The policy to decide whether a row should be discarded
-  // The without policy is able to be push down to the TimeJoinOperator because we can know whether
-  // a row contains
-  // null or not in this operator the situation won't be changed by the downstream operators.
-  private WithoutPolicy withoutPolicy;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public Tablet getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java
deleted file mode 100644
index f678a2c..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator.internal;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
-
-/**
- * WithoutOperator is used to discard specific result from upstream operators.
- *
- * <p>Children type: [All the operators whose result set is Tablet]
- */
-public class WithoutOperator extends InternalOperator<Tablet> {
-
-  // The policy to discard the result from upstream operator
-  private WithoutPolicy discardPolicy;
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public Tablet getNextBatch() {
-    return null;
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/CsvSourceOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/CsvSourceOperator.java
deleted file mode 100644
index ff82128..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/CsvSourceOperator.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.query.distribution.operator.source;
-
-import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
-
-public class CsvSourceOperator extends SourceOperator<SeriesBatchData> {
-
-  @Override
-  public void close() throws Exception {}
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public SeriesBatchData getNextBatch() {
-    return null;
-  }
-
-  @Override
-  public void open() throws Exception {}
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/PlanNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNode.java
similarity index 53%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/PlanNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNode.java
index ae30645..527b697 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/PlanNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNode.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan;
 
 import org.apache.iotdb.cluster.query.distribution.common.TreeNode;
 
@@ -9,10 +9,4 @@ import org.apache.iotdb.cluster.query.distribution.common.TreeNode;
  */
 public abstract class PlanNode<T> extends TreeNode<PlanNode<?>> {
 
-    // Judge whether current operator has more result
-    public abstract boolean hasNext();
-
-    // Get next result batch of this operator
-    // Return null if there is no more result to return
-    public abstract T getNextBatch();
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/DeviceMergeNode.java
similarity index 65%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/DeviceMergeNode.java
index ff831d9..fbadebe 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/DeviceMergeNode.java
@@ -1,7 +1,8 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
 import java.util.List;
 import java.util.Map;
@@ -15,7 +16,7 @@ import java.util.Map;
  *
  * Children type: [TimeJoinOperator]
  */
-public class DeviceMergeNode extends PlanNode<TsBlock> {
+public class DeviceMergeNode extends ProcessNode<TsBlock> {
     // The result output order that this operator
     private TraversalOrder mergeOrder;
 
@@ -24,17 +25,4 @@ public class DeviceMergeNode extends PlanNode<TsBlock> {
 
     // The map from deviceName to corresponding query result operator responsible for that device.
     private Map<String, TimeJoinNode> upstreamMap;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    // If the Tablet from TimeJoinOperator has n columns, the output of DeviceMergeOperator will contain n+1 columns where
-    // the additional column is `deviceName`
-    // And, the `alignedByDevice` in the TabletMetadata will be `true`
-    @Override
-    public TsBlock getNextBatch() {
-        return null;
-    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FillNode.java
similarity index 58%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FillNode.java
index a8f0076..8b08640 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FillNode.java
@@ -1,25 +1,16 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.FillPolicy;
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
 /**
  * FillOperator is used to fill the empty field in one row.
  *
  * Children type: [All the operators whose result set is Tablet]
  */
-public class FillNode extends PlanNode<TsBlock> {
+public class FillNode extends ProcessNode<TsBlock> {
 
     // The policy to discard the result from upstream operator
     private FillPolicy fillPolicy;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public TsBlock getNextBatch() {
-        return null;
-    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FilterNode.java
similarity index 62%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FilterNode.java
index 0bf0ad4..a579d4e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FilterNode.java
@@ -1,6 +1,7 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
 
 /**
@@ -9,18 +10,8 @@ import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
  *
  * Children type: [All the operators whose result set is Tablet]
  */
-public class FilterExecNode extends PlanNode<TsBlock> {
+public class FilterNode extends ProcessNode<TsBlock> {
 
     // The filter
     private FilterOperator rowFilter;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public TsBlock getNextBatch() {
-        return null;
-    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/GroupByLevelNode.java
similarity index 79%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/GroupByLevelNode.java
index 38448d5..2007900 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/GroupByLevelNode.java
@@ -1,8 +1,9 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
 import org.apache.iotdb.cluster.query.distribution.common.LevelBucketInfo;
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
 /**
  * This operator is responsible for the final aggregation merge operation.
@@ -13,7 +14,7 @@ import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
  *
  * Children type: [SeriesAggregateOperator]
  */
-public class GroupByLevelNode extends PlanNode<TsBlock> {
+public class GroupByLevelNode extends ProcessNode<TsBlock> {
 
     // All the buckets that the SeriesBatchAggInfo from upstream will be divided into.
     private LevelBucketInfo bucketInfo;
@@ -21,14 +22,4 @@ public class GroupByLevelNode extends PlanNode<TsBlock> {
     // The parameter of `group by time`
     // The GroupByLevelOperator also need GroupByTimeParameter
     private GroupByTimeParameter groupByTimeParameter;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public TsBlock getNextBatch() {
-        return null;
-    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/LimitNode.java
similarity index 52%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/LimitNode.java
index 875f8c4..e675086 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/LimitNode.java
@@ -1,24 +1,15 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
 /**
  * LimitOperator is used to select top n result. It uses the default order of upstream operators
  *
  * Children type: [All the operators whose result set is Tablet]
  */
-public class LimitNode extends PlanNode<TsBlock> {
+public class LimitNode extends ProcessNode<TsBlock> {
 
     // The limit count
     private int limit;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public TsBlock getNextBatch() {
-        return null;
-    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/OffsetNode.java
similarity index 54%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/OffsetNode.java
index 6de44c5..78ac9eb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/OffsetNode.java
@@ -1,24 +1,15 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
 /**
  * OffsetOperator is used to skip top n result from upstream operators. It uses the default order of upstream operators
  *
  * Children type: [All the operators whose result set is Tablet]
  */
-public class OffsetNode extends PlanNode<TsBlock> {
+public class OffsetNode extends ProcessNode<TsBlock> {
 
     // The limit count
     private int offset;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public TsBlock getNextBatch() {
-        return null;
-    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/ProcessNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/ProcessNode.java
new file mode 100644
index 0000000..e7bffae
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/ProcessNode.java
@@ -0,0 +1,6 @@
+package org.apache.iotdb.cluster.query.distribution.plan.process;
+
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+
+public class ProcessNode<T> extends PlanNode<T> {
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SeriesAggregateNode.java
similarity index 55%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SeriesAggregateNode.java
index 62fb223..46ef991 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SeriesAggregateNode.java
@@ -1,7 +1,11 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
 import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchAggInfo;
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
 
 /**
  * SeriesAggregateOperator is responsible to do the aggregation calculation for one series.
@@ -11,7 +15,7 @@ import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchAggInfo;
  *
  * Children type: [SeriesScanOperator]
  */
-public class SeriesAggregateNode extends PlanNode<SeriesBatchAggInfo> {
+public class SeriesAggregateNode extends ProcessNode<TsBlock> {
 
     // The parameter of `group by time`
     // Its value will be null if there is no `group by time` clause,
@@ -21,13 +25,12 @@ public class SeriesAggregateNode extends PlanNode<SeriesBatchAggInfo> {
     // We use a String to indicate the parameter temporarily
     private String aggregationFunc;
 
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public SeriesBatchAggInfo getNextBatch() {
+    // This method will only be invoked by SeriesAggregateOperator
+    // It will return the statistics of the series in given time range
+    // When calculate the statistics, the operator should use the most optimized way to do that. In
+    // other words, using
+    // raw data is the final way to do that.
+    public Statistics<?> getNextStatisticBetween(TimeRange timeRange) {
         return null;
     }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SortNode.java
similarity index 57%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SortNode.java
index 8bcc520..a718247 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SortNode.java
@@ -1,23 +1,15 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
 /**
  * In general, the parameter in sortOperator should be pushed down to the upstream operators.
  * In our optimized logical query plan, the sortOperator should not appear.
  */
-public class SortNode extends PlanNode<TsBlock> {
+public class SortNode extends ProcessNode<TsBlock> {
 
     private TraversalOrder sortOrder;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public TsBlock getNextBatch() {
-        return null;
-    }
+    
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/TimeJoinNode.java
similarity index 76%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/TimeJoinNode.java
index b293a13..08a7617 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/TimeJoinNode.java
@@ -1,8 +1,9 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
 import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
 /**
  * TimeJoinOperator is responsible for join two or more series.
@@ -11,7 +12,7 @@ import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
  *
  * Children type: [SeriesScanOperator]
  */
-public class TimeJoinNode extends PlanNode<TsBlock> {
+public class TimeJoinNode extends ProcessNode<TsBlock> {
 
     // This parameter indicates the order when executing multiway merge sort.
     private TraversalOrder mergeOrder;
@@ -20,14 +21,4 @@ public class TimeJoinNode extends PlanNode<TsBlock> {
     // The without policy is able to be push down to the TimeJoinOperator because we can know whether a row contains
     // null or not in this operator the situation won't be changed by the downstream operators.
     private WithoutPolicy withoutPolicy;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public TsBlock getNextBatch() {
-        return null;
-    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/WithoutNode.java
similarity index 60%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/WithoutNode.java
index c58a367..b3f1d9e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/WithoutNode.java
@@ -1,25 +1,16 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
 /**
  * WithoutOperator is used to discard specific result from upstream operators.
  *
  * Children type: [All the operators whose result set is Tablet]
  */
-public class WithoutNode extends PlanNode<TsBlock> {
+public class WithoutNode extends ProcessNode<TsBlock> {
 
     // The policy to discard the result from upstream operator
     private WithoutPolicy discardPolicy;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public TsBlock getNextBatch() {
-        return null;
-    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/ThriftSinkOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/CsvSinkNode.java
similarity index 70%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/ThriftSinkOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/CsvSinkNode.java
index c9de077..3e9dc4c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/ThriftSinkOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/CsvSinkNode.java
@@ -17,25 +17,13 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.query.distribution.operator.sink;
+package org.apache.iotdb.cluster.query.distribution.plan.sink;
 
 import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
 
-public class ThriftSinkOperator extends SinkOperator<SeriesBatchData> {
-
+public class CsvSinkNode extends SinkNode<SeriesBatchData> {
   @Override
-  public void close() throws Exception {}
+  public void close() throws Exception {
 
-  @Override
-  public boolean hasNext() {
-    return false;
   }
-
-  @Override
-  public SeriesBatchData getNextBatch() {
-    return null;
-  }
-
-  @Override
-  public void open() throws Exception {}
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SourceOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/SinkNode.java
similarity index 70%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SourceOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/SinkNode.java
index 304cafe..a94a0ff 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SourceOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/SinkNode.java
@@ -17,12 +17,11 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.query.distribution.operator.source;
+package org.apache.iotdb.cluster.query.distribution.plan.sink;
 
-import org.apache.iotdb.cluster.query.distribution.operator.ExecutableOperator;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
-// 区分不同的数据源,可拓展。
-public abstract class SourceOperator<T> extends ExecutableOperator<T> implements AutoCloseable {
+// 构建与客户端的联系。
+public abstract class SinkNode<T> extends PlanNode<T> implements AutoCloseable {
 
-  public abstract void open() throws Exception;
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/RestSinkOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/ThriftSinkNode.java
similarity index 74%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/RestSinkOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/ThriftSinkNode.java
index d8e6588..beed5a0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/RestSinkOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/ThriftSinkNode.java
@@ -17,25 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.query.distribution.operator.sink;
+package org.apache.iotdb.cluster.query.distribution.plan.sink;
 
 import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
 
-public class RestSinkOperator extends SinkOperator<SeriesBatchData> {
+public class ThriftSinkNode extends SinkNode<SeriesBatchData> {
 
   @Override
   public void close() throws Exception {}
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public SeriesBatchData getNextBatch() {
-    return null;
-  }
-
-  @Override
-  public void open() throws Exception {}
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/CsvSinkOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/CsvSourceNode.java
similarity index 78%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/CsvSinkOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/CsvSourceNode.java
index 17bba65..85f402a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/CsvSinkOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/CsvSourceNode.java
@@ -17,25 +17,15 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.query.distribution.operator.sink;
+package org.apache.iotdb.cluster.query.distribution.plan.source;
 
 import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
 
-public class CsvSinkOperator extends SinkOperator<SeriesBatchData> {
+public class CsvSourceNode extends SourceNode<SeriesBatchData> {
 
   @Override
   public void close() throws Exception {}
 
   @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public SeriesBatchData getNextBatch() {
-    return null;
-  }
-
-  @Override
   public void open() throws Exception {}
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesScanNode.java
similarity index 69%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesScanNode.java
index fd67f9e..7ff8978 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesScanNode.java
@@ -1,7 +1,7 @@
-package org.apache.iotdb.cluster.query.distribution.operator.source;
+package org.apache.iotdb.cluster.query.distribution.plan.source;
 
-import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
 import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
@@ -18,7 +18,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
  *
  * <p>Children type: []
  */
-public class SeriesScanOperator extends SourceOperator<SeriesBatchData> {
+public class SeriesScanNode extends SourceNode<TsBlock> {
 
   // The path of the target series which will be scanned.
   private Path seriesPath;
@@ -38,25 +38,6 @@ public class SeriesScanOperator extends SourceOperator<SeriesBatchData> {
   private int offset;
 
   @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public SeriesBatchData getNextBatch() {
-    return null;
-  }
-
-  // This method will only be invoked by SeriesAggregateOperator
-  // It will return the statistics of the series in given time range
-  // When calculate the statistics, the operator should use the most optimized way to do that. In
-  // other words, using
-  // raw data is the final way to do that.
-  public Statistics<?> getNextStatisticBetween(TimeRange timeRange) {
-    return null;
-  }
-
-  @Override
   public void close() throws Exception {}
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/SinkOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SourceNode.java
similarity index 75%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/SinkOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SourceNode.java
index 29c906d..ed0e39a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/SinkOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SourceNode.java
@@ -17,12 +17,11 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.query.distribution.operator.sink;
+package org.apache.iotdb.cluster.query.distribution.plan.source;
 
-import org.apache.iotdb.cluster.query.distribution.operator.ExecutableOperator;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
 
-// 构建与客户端的联系。
-public abstract class SinkOperator<T> extends ExecutableOperator<T> implements AutoCloseable {
+public abstract class SourceNode<T> extends PlanNode<T> implements AutoCloseable{
 
   public abstract void open() throws Exception;
 }

[iotdb] 04/11: complete basic design of all operators

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/mpp-query-basis
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7e68943cab0fb1ab53654c5a324673ba86c9058b
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Mar 1 23:20:29 2022 +0800

    complete basic design of all operators
---
 .../distribution/common/GroupByTimeParameter.java  | 10 +++++++
 .../query/distribution/common/LevelBucketInfo.java | 15 ++++++++++
 .../distribution/common/SeriesBatchAggInfo.java    | 21 +++++++++++++
 .../operator/GroupByLevelOperator.java             | 34 ++++++++++++++++++++++
 .../operator/SeriesAggregateOperator.java          | 33 +++++++++++++++++++++
 .../distribution/operator/SeriesScanOperator.java  | 10 +++++++
 6 files changed, 123 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/GroupByTimeParameter.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/GroupByTimeParameter.java
new file mode 100644
index 0000000..3f7e1bc
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/GroupByTimeParameter.java
@@ -0,0 +1,10 @@
+package org.apache.iotdb.db.query.distribution.common;
+
+import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
+
+/**
+ * In single-node IoTDB, the GroupByTimePlan is used to represent the parameter of `group by time`.
+ * To avoid ambiguity, we use another name `GroupByTimeParameter` here
+ */
+public class GroupByTimeParameter extends GroupByTimePlan {
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/LevelBucketInfo.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/LevelBucketInfo.java
new file mode 100644
index 0000000..6888475
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/LevelBucketInfo.java
@@ -0,0 +1,15 @@
+package org.apache.iotdb.db.query.distribution.common;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class is used to store all the buckets for the GroupByLevelOperator
+ * It stores the levels index and all the enumerated values in each level by a HashMap
+ * Using the HashMap, the operator could calculate all the buckets using combination of values from each level
+ */
+public class LevelBucketInfo {
+    // eg: If the clause is `group by level = 1, 2, 3`, the map should be like
+    // map{1 -> ['a', 'b'], 2 -> ['aa', 'bb'], 3 -> ['aaa', 'bbb']}
+    private Map<Integer, List<String>> levelMap;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/SeriesBatchAggInfo.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/SeriesBatchAggInfo.java
new file mode 100644
index 0000000..2ea9f9a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/SeriesBatchAggInfo.java
@@ -0,0 +1,21 @@
+package org.apache.iotdb.db.query.distribution.common;
+
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+
+/**
+ * SeriesBatchAggInfo is the "batch" result of SeriesAggregateOperator when its getNextBatch() is invoked.
+ */
+public class SeriesBatchAggInfo {
+    // Path of the series.
+    // Path will be used in the downstream operators.
+    // GroupByLevelOperator will use it to divide the data into different buckets to do the rollup operation.
+    private Path path;
+
+    // Time range of current statistic.
+    private TimeRange timeRange;
+
+    // Statistics for the series in current time range
+    private Statistics<?> statistics;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/GroupByLevelOperator.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/GroupByLevelOperator.java
new file mode 100644
index 0000000..f5fe41d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/GroupByLevelOperator.java
@@ -0,0 +1,34 @@
+package org.apache.iotdb.db.query.distribution.operator;
+
+import org.apache.iotdb.db.query.distribution.common.GroupByTimeParameter;
+import org.apache.iotdb.db.query.distribution.common.LevelBucketInfo;
+import org.apache.iotdb.db.query.distribution.common.Tablet;
+
+/**
+ * This operator is responsible for the final aggregation merge operation.
+ * It will arrange the data by time range firstly. And inside each time range, the data from same measurement and
+ * different devices will be rolled up by corresponding level into different buckets.
+ * If the bucketInfo is empty, the data from `same measurement and different devices` won't be rolled up.
+ * If the groupByTimeParameter is null, the data won't be split by time range.
+ *
+ * Children type: [SeriesAggregateOperator]
+ */
+public class GroupByLevelOperator extends ExecOperator<Tablet> {
+
+    // All the buckets that the SeriesBatchAggInfo from upstream will be divided into.
+    private LevelBucketInfo bucketInfo;
+
+    // The parameter of `group by time`
+    // The GroupByLevelOperator also need GroupByTimeParameter
+    private GroupByTimeParameter groupByTimeParameter;
+
+    @Override
+    public boolean hasNext() {
+        return false;
+    }
+
+    @Override
+    public Tablet getNextBatch() {
+        return null;
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SeriesAggregateOperator.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SeriesAggregateOperator.java
new file mode 100644
index 0000000..e5d02f8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SeriesAggregateOperator.java
@@ -0,0 +1,33 @@
+package org.apache.iotdb.db.query.distribution.operator;
+
+import org.apache.iotdb.db.query.distribution.common.GroupByTimeParameter;
+import org.apache.iotdb.db.query.distribution.common.SeriesBatchAggInfo;
+
+/**
+ * SeriesAggregateOperator is responsible to do the aggregation calculation for one series.
+ * This operator will split data in one series into many groups by time range and do the aggregation calculation for each
+ * group.
+ * If there is no split parameter, it will return one result which is the aggregation result of all data in current series.
+ *
+ * Children type: [SeriesScanOperator]
+ */
+public class SeriesAggregateOperator extends ExecOperator<SeriesBatchAggInfo> {
+
+    // The parameter of `group by time`
+    // Its value will be null if there is no `group by time` clause,
+    private GroupByTimeParameter groupByTimeParameter;
+
+    // TODO: need consider how to represent the aggregation function and corresponding implementation
+    // We use a String to indicate the parameter temporarily
+    private String aggregationFunc;
+
+    @Override
+    public boolean hasNext() {
+        return false;
+    }
+
+    @Override
+    public SeriesBatchAggInfo getNextBatch() {
+        return null;
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SeriesScanOperator.java
index 11527a5..94bc7da 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SeriesScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SeriesScanOperator.java
@@ -2,7 +2,9 @@ package org.apache.iotdb.db.query.distribution.operator;
 
 import org.apache.iotdb.db.query.distribution.common.SeriesBatchData;
 import org.apache.iotdb.db.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 /**
@@ -44,4 +46,12 @@ public class SeriesScanOperator extends ExecOperator<SeriesBatchData> {
     public SeriesBatchData getNextBatch() {
         return null;
     }
+
+    // This method will only be invoked by SeriesAggregateOperator
+    // It will return the statistics of the series in given time range
+    // When calculate the statistics, the operator should use the most optimized way to do that. In other words, using
+    // raw data is the final way to do that.
+    public Statistics<?> getNextStatisticBetween(TimeRange timeRange) {
+        return null;
+    }
 }

[iotdb] 11/11: add RowBasedSeriesAggregateNode

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/mpp-query-basis
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b06892c1b901cbbf2a3859f13961463ef42c41b1
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu Mar 10 22:39:27 2022 +0800

    add RowBasedSeriesAggregateNode
---
 .../plan/process/RowBasedSeriesAggregateNode.java  | 37 ++++++++++++++++++++++
 1 file changed, 37 insertions(+)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/RowBasedSeriesAggregateNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/RowBasedSeriesAggregateNode.java
new file mode 100644
index 0000000..5ed718e
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/RowBasedSeriesAggregateNode.java
@@ -0,0 +1,37 @@
+package org.apache.iotdb.cluster.query.distribution.plan.process;
+
+import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
+import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
+
+import java.util.List;
+
+/**
+ * This node is used to aggregate required series by raw data.
+ * The raw data will be input as a TsBlock. This node will output the series aggregated result represented by TsBlock
+ * Thus, the columns in output TsBlock will be different from input TsBlock.
+ */
+public class RowBasedSeriesAggregateNode extends ProcessNode {
+    // The parameter of `group by time`
+    // Its value will be null if there is no `group by time` clause,
+    private GroupByTimeParameter groupByTimeParameter;
+
+    // The list of aggregation functions, each FunctionExpression will be output as one column of result TsBlock
+    // (Currently we only support one series in the aggregation function)
+    // TODO: need consider whether it is suitable the aggregation function using FunctionExpression
+    private List<FunctionExpression> aggregateFuncList;
+
+    public RowBasedSeriesAggregateNode(PlanNodeId id) {
+        super(id);
+    }
+
+    public RowBasedSeriesAggregateNode(PlanNodeId id, List<FunctionExpression> aggregateFuncList) {
+        this(id);
+        this.aggregateFuncList = aggregateFuncList;
+    }
+
+    public RowBasedSeriesAggregateNode(PlanNodeId id, List<FunctionExpression> aggregateFuncList, GroupByTimeParameter groupByTimeParameter) {
+        this(id, aggregateFuncList);
+        this.groupByTimeParameter = groupByTimeParameter;
+    }
+}

[iotdb] 07/11: design of source, sink and internal operators

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/mpp-query-basis
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f02e74eed0091af5d8a5b380c15a8537260baf02
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Mar 8 23:39:41 2022 +0800

    design of source, sink and internal operators
---
 .../operator/internal/DeviceMergeOperator.java     | 43 ++++++++++++++++
 .../operator/internal/FillOperator.java            | 25 ++++++++++
 .../operator/internal/FilterInternalOperator.java  | 26 ++++++++++
 .../operator/internal/GroupByLevelOperator.java    | 34 +++++++++++++
 .../operator/internal/InternalOperator.java        | 17 +++++++
 .../operator/internal/LimitOperator.java           | 24 +++++++++
 .../operator/internal/OffsetOperator.java          | 25 ++++++++++
 .../operator/internal/SeriesAggregateOperator.java | 33 ++++++++++++
 .../operator/internal/SortOperator.java            | 23 +++++++++
 .../operator/internal/TimeJoinOperator.java        | 33 ++++++++++++
 .../operator/internal/WithoutOperator.java         | 25 ++++++++++
 .../operator/source/SeriesScanOperator.java        | 58 ++++++++++++++++++++++
 12 files changed, 366 insertions(+)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java
new file mode 100644
index 0000000..b912994
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java
@@ -0,0 +1,43 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DeviceMergeOperator is responsible for constructing a device-based view of a set of series. And
+ * output the result with specific order. The order could be 'order by device' or 'order by
+ * timestamp'
+ *
+ * <p>The types of involved devices should be same. If the device contains n series, the
+ * device-based view will contain n+2 columns, which are timestamp column, device name column and n
+ * value columns of involved series.
+ *
+ * <p>Children type: [TimeJoinOperator]
+ */
+public class DeviceMergeOperator extends InternalOperator<Tablet> {
+  // The result output order that this operator
+  private TraversalOrder mergeOrder;
+
+  // Owned devices
+  private List<String> ownedDeviceNameList;
+
+  // The map from deviceName to corresponding query result operator responsible for that device.
+  private Map<String, TimeJoinOperator> upstreamMap;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  // If the Tablet from TimeJoinOperator has n columns, the output of DeviceMergeOperator will
+  // contain n+1 columns where
+  // the additional column is `deviceName`
+  // And, the `alignedByDevice` in the TabletMetadata will be `true`
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java
new file mode 100644
index 0000000..8ee610a
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java
@@ -0,0 +1,25 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.FillPolicy;
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+
+/**
+ * FillOperator is used to fill the empty field in one row.
+ *
+ * <p>Children type: [All the operators whose result set is Tablet]
+ */
+public class FillOperator extends InternalOperator<Tablet> {
+
+  // The policy to discard the result from upstream operator
+  private FillPolicy fillPolicy;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java
new file mode 100644
index 0000000..ecbda0f
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java
@@ -0,0 +1,26 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
+
+/**
+ * (We use FilterExecOperator to distinguish itself from the FilterOperator used in single-node
+ * IoTDB) The FilterExecOperator is responsible to filter the RowRecord from Tablet.
+ *
+ * <p>Children type: [All the operators whose result set is Tablet]
+ */
+public class FilterInternalOperator extends InternalOperator<Tablet> {
+
+  // The filter
+  private FilterOperator rowFilter;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java
new file mode 100644
index 0000000..c6e00d2
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java
@@ -0,0 +1,34 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
+import org.apache.iotdb.cluster.query.distribution.common.LevelBucketInfo;
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+
+/**
+ * This operator is responsible for the final aggregation merge operation. It will arrange the data
+ * by time range firstly. And inside each time range, the data from same measurement and different
+ * devices will be rolled up by corresponding level into different buckets. If the bucketInfo is
+ * empty, the data from `same measurement and different devices` won't be rolled up. If the
+ * groupByTimeParameter is null, the data won't be split by time range.
+ *
+ * <p>Children type: [SeriesAggregateOperator]
+ */
+public class GroupByLevelOperator extends InternalOperator<Tablet> {
+
+  // All the buckets that the SeriesBatchAggInfo from upstream will be divided into.
+  private LevelBucketInfo bucketInfo;
+
+  // The parameter of `group by time`
+  // The GroupByLevelOperator also need GroupByTimeParameter
+  private GroupByTimeParameter groupByTimeParameter;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java
new file mode 100644
index 0000000..0692381
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java
@@ -0,0 +1,17 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.TreeNode;
+
+/**
+ * @author xingtanzjr The base class of query executable operators, which is used to compose logical
+ *     query plan. TODO: consider how to restrict the children type for each type of ExecOperator
+ */
+public abstract class InternalOperator<T> extends TreeNode<InternalOperator<?>> {
+
+  // Judge whether current operator has more result
+  public abstract boolean hasNext();
+
+  // Get next result batch of this operator
+  // Return null if there is no more result to return
+  public abstract T getNextBatch();
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java
new file mode 100644
index 0000000..9075f64
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java
@@ -0,0 +1,24 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+
+/**
+ * LimitOperator is used to select top n result. It uses the default order of upstream operators
+ *
+ * <p>Children type: [All the operators whose result set is Tablet]
+ */
+public class LimitOperator extends InternalOperator<Tablet> {
+
+  // The limit count
+  private int limit;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java
new file mode 100644
index 0000000..c6f79ac
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java
@@ -0,0 +1,25 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+
+/**
+ * OffsetOperator is used to skip top n result from upstream operators. It uses the default order of
+ * upstream operators
+ *
+ * <p>Children type: [All the operators whose result set is Tablet]
+ */
+public class OffsetOperator extends InternalOperator<Tablet> {
+
+  // The limit count
+  private int offset;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java
new file mode 100644
index 0000000..96a5c86
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java
@@ -0,0 +1,33 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
+import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchAggInfo;
+
+/**
+ * SeriesAggregateOperator is responsible to do the aggregation calculation for one series. This
+ * operator will split data in one series into many groups by time range and do the aggregation
+ * calculation for each group. If there is no split parameter, it will return one result which is
+ * the aggregation result of all data in current series.
+ *
+ * <p>Children type: [SeriesScanOperator]
+ */
+public class SeriesAggregateOperator extends InternalOperator<SeriesBatchAggInfo> {
+
+  // The parameter of `group by time`
+  // Its value will be null if there is no `group by time` clause,
+  private GroupByTimeParameter groupByTimeParameter;
+
+  // TODO: need consider how to represent the aggregation function and corresponding implementation
+  // We use a String to indicate the parameter temporarily
+  private String aggregationFunc;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public SeriesBatchAggInfo getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java
new file mode 100644
index 0000000..fa83be7
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java
@@ -0,0 +1,23 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+
+/**
+ * In general, the parameter in sortOperator should be pushed down to the upstream operators. In our
+ * optimized logical query plan, the sortOperator should not appear.
+ */
+public class SortOperator extends InternalOperator<Tablet> {
+
+  private TraversalOrder sortOrder;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java
new file mode 100644
index 0000000..c4ae6f3
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java
@@ -0,0 +1,33 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
+
+/**
+ * TimeJoinOperator is responsible for join two or more series. The join algorithm is like outer
+ * join by timestamp column. The output result of TimeJoinOperator is sorted by timestamp
+ *
+ * <p>Children type: [SeriesScanOperator]
+ */
+public class TimeJoinOperator extends InternalOperator<Tablet> {
+
+  // This parameter indicates the order when executing multiway merge sort.
+  private TraversalOrder mergeOrder;
+
+  // The policy to decide whether a row should be discarded
+  // The without policy is able to be push down to the TimeJoinOperator because we can know whether
+  // a row contains
+  // null or not in this operator the situation won't be changed by the downstream operators.
+  private WithoutPolicy withoutPolicy;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java
new file mode 100644
index 0000000..535a3d6
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java
@@ -0,0 +1,25 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
+
+/**
+ * WithoutOperator is used to discard specific result from upstream operators.
+ *
+ * <p>Children type: [All the operators whose result set is Tablet]
+ */
+public class WithoutOperator extends InternalOperator<Tablet> {
+
+  // The policy to discard the result from upstream operator
+  private WithoutPolicy discardPolicy;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java
new file mode 100644
index 0000000..9cd6043
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java
@@ -0,0 +1,58 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
+import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+/**
+ * SeriesScanOperator is responsible for read data and pre-aggregated statistic for a specific
+ * series. When reading data, the SeriesScanOperator can read the raw data batch by batch. And also,
+ * it can leverage the filter and other info to decrease the result set. Besides, the
+ * SeriesScanOperator can read the pre-aggregated statistic in TsFile. And return the statistic with
+ * a fix time range one by one. If the time range is narrower than the smallest pre-aggregated
+ * statistic or has overlap with pre-aggregated statistic, the SeriesScanOperator will read the raw
+ * data and calculate the aggregation result for specific time range.
+ *
+ * <p>Children type: []
+ */
+public class SeriesScanOperator extends InternalOperator<SeriesBatchData> {
+
+  // The path of the target series which will be scanned.
+  private Path seriesPath;
+
+  // The order to traverse the data.
+  // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
+  // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
+  private TraversalOrder scanOrder = TraversalOrder.TIMESTAMP_ASC;
+
+  // Filter data in current series.
+  private Filter filter;
+
+  // Limit for result set. The default value is -1, which means no limit
+  private int limit = -1;
+
+  // offset for result set. The default value is 0
+  private int offset;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public SeriesBatchData getNextBatch() {
+    return null;
+  }
+
+  // This method will only be invoked by SeriesAggregateOperator
+  // It will return the statistics of the series in given time range
+  // When calculate the statistics, the operator should use the most optimized way to do that. In
+  // other words, using
+  // raw data is the final way to do that.
+  public Statistics<?> getNextStatisticBetween(TimeRange timeRange) {
+    return null;
+  }
+}

[iotdb] 05/11: move query distribution folder to cluster

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/mpp-query-basis
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 12d7af3b88d3e49fe2e2a4c3c872e330ae8f26c5
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Mar 7 13:34:55 2022 +0800

    move query distribution folder to cluster
---
 .../iotdb/cluster/query/distribution/common/FillPolicy.java       | 5 +++++
 .../cluster}/query/distribution/common/GroupByTimeParameter.java  | 2 +-
 .../iotdb/cluster}/query/distribution/common/LevelBucketInfo.java | 2 +-
 .../cluster}/query/distribution/common/SeriesBatchAggInfo.java    | 2 +-
 .../iotdb/cluster}/query/distribution/common/SeriesBatchData.java | 2 +-
 .../apache/iotdb/cluster}/query/distribution/common/Tablet.java   | 2 +-
 .../iotdb/cluster}/query/distribution/common/TabletMetadata.java  | 4 +---
 .../iotdb/cluster}/query/distribution/common/TraversalOrder.java  | 2 +-
 .../apache/iotdb/cluster}/query/distribution/common/TreeNode.java | 2 +-
 .../iotdb/cluster}/query/distribution/common/WithoutPolicy.java   | 2 +-
 .../cluster}/query/distribution/operator/DeviceMergeOperator.java | 6 +++---
 .../iotdb/cluster}/query/distribution/operator/ExecOperator.java  | 4 ++--
 .../iotdb/cluster}/query/distribution/operator/FillOperator.java  | 7 +++----
 .../cluster}/query/distribution/operator/FilterExecOperator.java  | 4 ++--
 .../query/distribution/operator/GroupByLevelOperator.java         | 8 ++++----
 .../iotdb/cluster}/query/distribution/operator/LimitOperator.java | 4 ++--
 .../cluster}/query/distribution/operator/OffsetOperator.java      | 4 ++--
 .../query/distribution/operator/SeriesAggregateOperator.java      | 6 +++---
 .../cluster}/query/distribution/operator/SeriesScanOperator.java  | 6 +++---
 .../iotdb/cluster}/query/distribution/operator/SortOperator.java  | 8 ++++----
 .../cluster}/query/distribution/operator/TimeJoinOperator.java    | 8 ++++----
 .../cluster}/query/distribution/operator/WithoutOperator.java     | 6 +++---
 .../org/apache/iotdb/db/query/distribution/common/FillPolicy.java | 5 -----
 23 files changed, 49 insertions(+), 52 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/FillPolicy.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/FillPolicy.java
new file mode 100644
index 0000000..a81d6fe
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/FillPolicy.java
@@ -0,0 +1,5 @@
+package org.apache.iotdb.cluster.query.distribution.common;
+
+public enum FillPolicy {
+    PREVIOUS,
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/GroupByTimeParameter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/GroupByTimeParameter.java
similarity index 83%
rename from server/src/main/java/org/apache/iotdb/db/query/distribution/common/GroupByTimeParameter.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/GroupByTimeParameter.java
index 3f7e1bc..23ad3a9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/GroupByTimeParameter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/GroupByTimeParameter.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.db.query.distribution.common;
+package org.apache.iotdb.cluster.query.distribution.common;
 
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/LevelBucketInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/LevelBucketInfo.java
similarity index 90%
rename from server/src/main/java/org/apache/iotdb/db/query/distribution/common/LevelBucketInfo.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/LevelBucketInfo.java
index 6888475..44ee37d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/LevelBucketInfo.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/LevelBucketInfo.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.db.query.distribution.common;
+package org.apache.iotdb.cluster.query.distribution.common;
 
 import java.util.List;
 import java.util.Map;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/SeriesBatchAggInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/SeriesBatchAggInfo.java
similarity index 92%
rename from server/src/main/java/org/apache/iotdb/db/query/distribution/common/SeriesBatchAggInfo.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/SeriesBatchAggInfo.java
index 2ea9f9a..c1726d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/SeriesBatchAggInfo.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/SeriesBatchAggInfo.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.db.query.distribution.common;
+package org.apache.iotdb.cluster.query.distribution.common;
 
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.Path;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/SeriesBatchData.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/SeriesBatchData.java
similarity index 89%
rename from server/src/main/java/org/apache/iotdb/db/query/distribution/common/SeriesBatchData.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/SeriesBatchData.java
index 7a9ae65..3384d49 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/SeriesBatchData.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/SeriesBatchData.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.db.query.distribution.common;
+package org.apache.iotdb.cluster.query.distribution.common;
 
 import org.apache.iotdb.tsfile.read.common.BatchData;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/Tablet.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/Tablet.java
similarity index 92%
rename from server/src/main/java/org/apache/iotdb/db/query/distribution/common/Tablet.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/Tablet.java
index 956ccb0..dde6fa7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/Tablet.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/Tablet.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.db.query.distribution.common;
+package org.apache.iotdb.cluster.query.distribution.common;
 
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/TabletMetadata.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TabletMetadata.java
similarity index 89%
rename from server/src/main/java/org/apache/iotdb/db/query/distribution/common/TabletMetadata.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TabletMetadata.java
index 0a01959..cdf827e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/TabletMetadata.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TabletMetadata.java
@@ -1,6 +1,4 @@
-package org.apache.iotdb.db.query.distribution.common;
-
-import org.apache.iotdb.tsfile.read.common.Path;
+package org.apache.iotdb.cluster.query.distribution.common;
 
 import java.util.List;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/TraversalOrder.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TraversalOrder.java
similarity index 74%
rename from server/src/main/java/org/apache/iotdb/db/query/distribution/common/TraversalOrder.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TraversalOrder.java
index 3b1bc92..ea88253 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/TraversalOrder.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TraversalOrder.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.db.query.distribution.common;
+package org.apache.iotdb.cluster.query.distribution.common;
 
 /**
  * The traversal order for operators by timestamp
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/TreeNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TreeNode.java
similarity index 88%
rename from server/src/main/java/org/apache/iotdb/db/query/distribution/common/TreeNode.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TreeNode.java
index 9cf2dd3..a242660 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/TreeNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TreeNode.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.db.query.distribution.common;
+package org.apache.iotdb.cluster.query.distribution.common;
 
 import java.util.List;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/WithoutPolicy.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/WithoutPolicy.java
similarity index 51%
rename from server/src/main/java/org/apache/iotdb/db/query/distribution/common/WithoutPolicy.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/WithoutPolicy.java
index 02d1e43..794be12 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/WithoutPolicy.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/WithoutPolicy.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.db.query.distribution.common;
+package org.apache.iotdb.cluster.query.distribution.common;
 
 public enum WithoutPolicy {
     CONTAINS_NULL,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/DeviceMergeOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeOperator.java
similarity index 86%
rename from server/src/main/java/org/apache/iotdb/db/query/distribution/operator/DeviceMergeOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeOperator.java
index 45e52e4..1800179 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/DeviceMergeOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeOperator.java
@@ -1,7 +1,7 @@
-package org.apache.iotdb.db.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator;
 
-import org.apache.iotdb.db.query.distribution.common.Tablet;
-import org.apache.iotdb.db.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
 
 import java.util.List;
 import java.util.Map;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/ExecOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecOperator.java
similarity index 79%
rename from server/src/main/java/org/apache/iotdb/db/query/distribution/operator/ExecOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecOperator.java
index b9a1691..883589b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/ExecOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecOperator.java
@@ -1,6 +1,6 @@
-package org.apache.iotdb.db.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator;
 
-import org.apache.iotdb.db.query.distribution.common.TreeNode;
+import org.apache.iotdb.cluster.query.distribution.common.TreeNode;
 
 /**
  * @author xingtanzjr
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/FillOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillOperator.java
similarity index 64%
rename from server/src/main/java/org/apache/iotdb/db/query/distribution/operator/FillOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillOperator.java
index 306c2b9..9390e59 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/FillOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillOperator.java
@@ -1,8 +1,7 @@
-package org.apache.iotdb.db.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator;
 
-import org.apache.iotdb.db.query.distribution.common.FillPolicy;
-import org.apache.iotdb.db.query.distribution.common.Tablet;
-import org.apache.iotdb.db.query.distribution.common.WithoutPolicy;
+import org.apache.iotdb.cluster.query.distribution.common.FillPolicy;
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
 
 /**
  * FillOperator is used to fill the empty field in one row.
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/FilterExecOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecOperator.java
similarity index 82%
rename from server/src/main/java/org/apache/iotdb/db/query/distribution/operator/FilterExecOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecOperator.java
index 65d2b8e..efe4609 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/FilterExecOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecOperator.java
@@ -1,7 +1,7 @@
-package org.apache.iotdb.db.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator;
 
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
 import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
-import org.apache.iotdb.db.query.distribution.common.Tablet;
 
 /**
  * (We use FilterExecOperator to distinguish itself from the FilterOperator used in single-node IoTDB)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/GroupByLevelOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelOperator.java
similarity index 78%
rename from server/src/main/java/org/apache/iotdb/db/query/distribution/operator/GroupByLevelOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelOperator.java
index f5fe41d..729c84f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/GroupByLevelOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelOperator.java
@@ -1,8 +1,8 @@
-package org.apache.iotdb.db.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator;
 
-import org.apache.iotdb.db.query.distribution.common.GroupByTimeParameter;
-import org.apache.iotdb.db.query.distribution.common.LevelBucketInfo;
-import org.apache.iotdb.db.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
+import org.apache.iotdb.cluster.query.distribution.common.LevelBucketInfo;
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
 
 /**
  * This operator is responsible for the final aggregation merge operation.
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/LimitOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitOperator.java
similarity index 77%
rename from server/src/main/java/org/apache/iotdb/db/query/distribution/operator/LimitOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitOperator.java
index 332c878..a48e664 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/LimitOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitOperator.java
@@ -1,6 +1,6 @@
-package org.apache.iotdb.db.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator;
 
-import org.apache.iotdb.db.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
 
 /**
  * LimitOperator is used to select top n result. It uses the default order of upstream operators
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/OffsetOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetOperator.java
similarity index 78%
rename from server/src/main/java/org/apache/iotdb/db/query/distribution/operator/OffsetOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetOperator.java
index c8c0ec4..11cd7de 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/OffsetOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetOperator.java
@@ -1,6 +1,6 @@
-package org.apache.iotdb.db.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator;
 
-import org.apache.iotdb.db.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
 
 /**
  * OffsetOperator is used to skip top n result from upstream operators. It uses the default order of upstream operators
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SeriesAggregateOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateOperator.java
similarity index 82%
rename from server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SeriesAggregateOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateOperator.java
index e5d02f8..e566f49 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SeriesAggregateOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateOperator.java
@@ -1,7 +1,7 @@
-package org.apache.iotdb.db.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator;
 
-import org.apache.iotdb.db.query.distribution.common.GroupByTimeParameter;
-import org.apache.iotdb.db.query.distribution.common.SeriesBatchAggInfo;
+import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
+import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchAggInfo;
 
 /**
  * SeriesAggregateOperator is responsible to do the aggregation calculation for one series.
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SeriesScanOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanOperator.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SeriesScanOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanOperator.java
index 94bc7da..46077bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SeriesScanOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanOperator.java
@@ -1,7 +1,7 @@
-package org.apache.iotdb.db.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator;
 
-import org.apache.iotdb.db.query.distribution.common.SeriesBatchData;
-import org.apache.iotdb.db.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
+import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SortOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortOperator.java
similarity index 58%
rename from server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SortOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortOperator.java
index c1e2b0a..ab3f01d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SortOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortOperator.java
@@ -1,13 +1,13 @@
-package org.apache.iotdb.db.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator;
 
-import org.apache.iotdb.db.query.distribution.common.Tablet;
-import org.apache.iotdb.db.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
 
 /**
  * In general, the parameter in sortOperator should be pushed down to the upstream operators.
  * In our optimized logical query plan, the sortOperator should not appear.
  */
-public class SortOperator extends ExecOperator<Tablet>{
+public class SortOperator extends ExecOperator<Tablet> {
 
     private TraversalOrder sortOrder;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/TimeJoinOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinOperator.java
similarity index 76%
rename from server/src/main/java/org/apache/iotdb/db/query/distribution/operator/TimeJoinOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinOperator.java
index dbf1deb..c573495 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/TimeJoinOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinOperator.java
@@ -1,8 +1,8 @@
-package org.apache.iotdb.db.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator;
 
-import org.apache.iotdb.db.query.distribution.common.Tablet;
-import org.apache.iotdb.db.query.distribution.common.TraversalOrder;
-import org.apache.iotdb.db.query.distribution.common.WithoutPolicy;
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
 
 /**
  * TimeJoinOperator is responsible for join two or more series.
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/WithoutOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutOperator.java
similarity index 70%
rename from server/src/main/java/org/apache/iotdb/db/query/distribution/operator/WithoutOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutOperator.java
index 75250b9..f25d435 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/WithoutOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutOperator.java
@@ -1,7 +1,7 @@
-package org.apache.iotdb.db.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator;
 
-import org.apache.iotdb.db.query.distribution.common.Tablet;
-import org.apache.iotdb.db.query.distribution.common.WithoutPolicy;
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
 
 /**
  * WithoutOperator is used to discard specific result from upstream operators.
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/FillPolicy.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/FillPolicy.java
deleted file mode 100644
index ba59bad..0000000
--- a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/FillPolicy.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package org.apache.iotdb.db.query.distribution.common;
-
-public enum FillPolicy {
-    PREVIOUS,
-}

[iotdb] 10/11: complete basic definition of all nodes

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/mpp-query-basis
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fe5d9d228ebff8d3dc5ef0739fcfbf8e282ffd99
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu Mar 10 22:18:43 2022 +0800

    complete basic definition of all nodes
---
 .../query/distribution/common/LevelBucketInfo.java | 15 ------
 .../common/{TraversalOrder.java => OrderBy.java}   |  2 +-
 .../distribution/common/SeriesBatchAggInfo.java    | 21 --------
 .../query/distribution/common/SeriesBatchData.java | 14 -----
 .../cluster/query/distribution/plan/PlanNode.java  |  7 ++-
 .../query/distribution/plan/PlanNodeId.java        | 34 ++++++++++++
 .../distribution/plan/process/DeviceMergeNode.java | 40 ++++++++++----
 .../query/distribution/plan/process/FillNode.java  | 17 ++++--
 .../distribution/plan/process/FilterNode.java      | 19 ++++---
 .../plan/process/GroupByLevelNode.java             | 34 ++++++------
 .../query/distribution/plan/process/LimitNode.java | 17 ++++--
 .../distribution/plan/process/OffsetNode.java      | 17 ++++--
 .../distribution/plan/process/ProcessNode.java     |  7 ++-
 .../plan/process/SeriesAggregateNode.java          | 36 -------------
 .../query/distribution/plan/process/SortNode.java  | 23 +++++---
 .../distribution/plan/process/TimeJoinNode.java    | 40 +++++++++++---
 .../distribution/plan/process/WithoutNode.java     | 18 ++++---
 .../query/distribution/plan/sink/CsvSinkNode.java  | 13 ++++-
 .../distribution/plan/sink/FragmentSinkNode.java   | 20 +++++++
 .../query/distribution/plan/sink/SinkNode.java     | 10 +++-
 .../distribution/plan/sink/ThriftSinkNode.java     | 16 +++++-
 .../distribution/plan/source/CsvSourceNode.java    | 11 +++-
 .../plan/source/SeriesAggregateNode.java           | 63 ++++++++++++++++++++++
 .../distribution/plan/source/SeriesScanNode.java   | 45 ++++++++++------
 .../query/distribution/plan/source/SourceNode.java |  8 ++-
 25 files changed, 363 insertions(+), 184 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/LevelBucketInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/LevelBucketInfo.java
deleted file mode 100644
index 44ee37d..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/LevelBucketInfo.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.common;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * This class is used to store all the buckets for the GroupByLevelOperator
- * It stores the levels index and all the enumerated values in each level by a HashMap
- * Using the HashMap, the operator could calculate all the buckets using combination of values from each level
- */
-public class LevelBucketInfo {
-    // eg: If the clause is `group by level = 1, 2, 3`, the map should be like
-    // map{1 -> ['a', 'b'], 2 -> ['aa', 'bb'], 3 -> ['aaa', 'bbb']}
-    private Map<Integer, List<String>> levelMap;
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TraversalOrder.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/OrderBy.java
similarity index 87%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TraversalOrder.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/OrderBy.java
index ea88253..07f005f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TraversalOrder.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/OrderBy.java
@@ -3,7 +3,7 @@ package org.apache.iotdb.cluster.query.distribution.common;
 /**
  * The traversal order for operators by timestamp
  */
-public enum TraversalOrder {
+public enum OrderBy {
     TIMESTAMP_ASC,
     TIMESTAMP_DESC,
     DEVICE_NAME_ASC,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/SeriesBatchAggInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/SeriesBatchAggInfo.java
deleted file mode 100644
index c1726d5..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/SeriesBatchAggInfo.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.common;
-
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
-
-/**
- * SeriesBatchAggInfo is the "batch" result of SeriesAggregateOperator when its getNextBatch() is invoked.
- */
-public class SeriesBatchAggInfo {
-    // Path of the series.
-    // Path will be used in the downstream operators.
-    // GroupByLevelOperator will use it to divide the data into different buckets to do the rollup operation.
-    private Path path;
-
-    // Time range of current statistic.
-    private TimeRange timeRange;
-
-    // Statistics for the series in current time range
-    private Statistics<?> statistics;
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/SeriesBatchData.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/SeriesBatchData.java
deleted file mode 100644
index 3384d49..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/SeriesBatchData.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.common;
-
-import org.apache.iotdb.tsfile.read.common.BatchData;
-
-/**
- * @author xingtanzjr
- * TODO: currently we only use it to describe the result set of SeriesScanOperator
- * The BatchData is suitable as the encapsulation of part of result set of SeriesScanOperator
- * BatchData is the class defined and generally used in single-node IoTDB
- * We leverage it as the `batch` here. We can consider a more general name or make some modifications for it.
- */
-public class SeriesBatchData extends BatchData {
-
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNode.java
index 527b697..d4b763d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNode.java
@@ -7,6 +7,9 @@ import org.apache.iotdb.cluster.query.distribution.common.TreeNode;
  * The base class of query executable operators, which is used to compose logical query plan.
  * TODO: consider how to restrict the children type for each type of ExecOperator
  */
-public abstract class PlanNode<T> extends TreeNode<PlanNode<?>> {
-
+public abstract class PlanNode<T> extends TreeNode<PlanNode<T>> {
+    private PlanNodeId id;
+    public PlanNode(PlanNodeId id) {
+        this.id = id;
+    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNodeId.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNodeId.java
new file mode 100644
index 0000000..b9d2888
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNodeId.java
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.iotdb.cluster.query.distribution.plan;
+
+public class PlanNodeId {
+    private String id;
+    public PlanNodeId(String id) {
+        this.id = id;
+    }
+
+    public String getId() {
+        return this.id;
+    }
+
+    @Override
+    public String toString() {
+        return this.id;
+    }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/DeviceMergeNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/DeviceMergeNode.java
index fbadebe..9cd2549 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/DeviceMergeNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/DeviceMergeNode.java
@@ -1,8 +1,10 @@
 package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.cluster.query.distribution.common.OrderBy;
+import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
 import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 
 import java.util.List;
 import java.util.Map;
@@ -11,18 +13,36 @@ import java.util.Map;
  * DeviceMergeOperator is responsible for constructing a device-based view of a set of series. And output the result with
  * specific order. The order could be 'order by device' or 'order by timestamp'
  *
- * The types of involved devices should be same. If the device contains n series, the device-based view will contain n+2
- * columns, which are timestamp column, device name column and n value columns of involved series.
+ * Each output from its children should have the same schema. That means, the columns should be same between these TsBlocks.
+ * If the input TsBlock contains n columns, the device-based view will contain n+1 columns where the new column is Device
+ * column.
  *
- * Children type: [TimeJoinOperator]
  */
-public class DeviceMergeNode extends ProcessNode<TsBlock> {
+public class DeviceMergeNode extends ProcessNode {
     // The result output order that this operator
-    private TraversalOrder mergeOrder;
+    private OrderBy mergeOrder;
 
-    // Owned devices
-    private List<String> ownedDeviceNameList;
+    // The policy to decide whether a row should be discarded
+    // The without policy is able to be push down to the DeviceMergeNode because we can know whether a row contains
+    // null or not.
+    private WithoutPolicy withoutPolicy;
 
-    // The map from deviceName to corresponding query result operator responsible for that device.
-    private Map<String, TimeJoinNode> upstreamMap;
+    // The map from deviceName to corresponding query result node responsible for that device.
+    // DeviceNode means the node whose output TsBlock contains the data belonged to one device.
+    private Map<String, PlanNode<TsBlock>> childDeviceNodeMap;
+
+    public DeviceMergeNode(PlanNodeId id) {
+        super(id);
+    }
+
+    public DeviceMergeNode(PlanNodeId id, Map<String, PlanNode<TsBlock>> deviceNodeMap) {
+        this(id);
+        this.childDeviceNodeMap = deviceNodeMap;
+        this.children.addAll(deviceNodeMap.values());
+    }
+
+    public void addChildDeviceNode(String deviceName, PlanNode<TsBlock> childNode) {
+        this.childDeviceNodeMap.put(deviceName, childNode);
+        this.children.add(childNode);
+    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FillNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FillNode.java
index 8b08640..430948d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FillNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FillNode.java
@@ -3,14 +3,23 @@ package org.apache.iotdb.cluster.query.distribution.plan.process;
 import org.apache.iotdb.cluster.query.distribution.common.FillPolicy;
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 
 /**
- * FillOperator is used to fill the empty field in one row.
+ * FillNode is used to fill the empty field in one row.
  *
- * Children type: [All the operators whose result set is Tablet]
  */
-public class FillNode extends ProcessNode<TsBlock> {
+public class FillNode extends ProcessNode {
 
-    // The policy to discard the result from upstream operator
+    // The policy to discard the result from upstream node
     private FillPolicy fillPolicy;
+
+    public FillNode(PlanNodeId id) {
+        super(id);
+    }
+
+    public FillNode(PlanNodeId id, FillPolicy fillPolicy) {
+        this(id);
+        this.fillPolicy = fillPolicy;
+    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FilterNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FilterNode.java
index a579d4e..e7a4fce 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FilterNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FilterNode.java
@@ -1,17 +1,22 @@
 package org.apache.iotdb.cluster.query.distribution.plan.process;
 
-import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
-import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
 
 /**
- * (We use FilterExecOperator to distinguish itself from the FilterOperator used in single-node IoTDB)
- * The FilterExecOperator is responsible to filter the RowRecord from Tablet.
- *
- * Children type: [All the operators whose result set is Tablet]
+ * The FilterNode is responsible to filter the RowRecord from TsBlock.
  */
-public class FilterNode extends ProcessNode<TsBlock> {
+public class FilterNode extends ProcessNode {
 
     // The filter
     private FilterOperator rowFilter;
+
+    public FilterNode(PlanNodeId id) {
+        super(id);
+    }
+
+    public FilterNode(PlanNodeId id, FilterOperator rowFilter) {
+        this(id);
+        this.rowFilter = rowFilter;
+    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/GroupByLevelNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/GroupByLevelNode.java
index 2007900..54ff8eb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/GroupByLevelNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/GroupByLevelNode.java
@@ -1,25 +1,25 @@
 package org.apache.iotdb.cluster.query.distribution.plan.process;
 
-import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
-import org.apache.iotdb.cluster.query.distribution.common.LevelBucketInfo;
-import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
-import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 
 /**
- * This operator is responsible for the final aggregation merge operation.
- * It will arrange the data by time range firstly. And inside each time range, the data from same measurement and
- * different devices will be rolled up by corresponding level into different buckets.
- * If the bucketInfo is empty, the data from `same measurement and different devices` won't be rolled up.
- * If the groupByTimeParameter is null, the data won't be split by time range.
- *
- * Children type: [SeriesAggregateOperator]
+ * This node is responsible for the final aggregation merge operation.
+ * It will process the data from TsBlock row by row.
+ * For one row, it will rollup the fields which have the same aggregate function and belong to one bucket.
+ * Here, that two columns belong to one bucket means the partial paths of device after rolling up in specific level
+ * are the same.
+ * For example, let's say there are two columns `root.sg.d1.s1` and `root.sg.d2.s1`.
+ * If the group by level parameter is [0, 1], then these two columns will belong to one bucket and the bucket name
+ * is `root.sg.*.s1`.
+ * If the group by level parameter is [0, 2], then these two columns will not belong to one bucket. And the total buckets
+ * are `root.*.d1.s1` and `root.*.d2.s1`
  */
-public class GroupByLevelNode extends ProcessNode<TsBlock> {
+public class GroupByLevelNode extends ProcessNode {
 
-    // All the buckets that the SeriesBatchAggInfo from upstream will be divided into.
-    private LevelBucketInfo bucketInfo;
+    private int[] groupByLevels;
 
-    // The parameter of `group by time`
-    // The GroupByLevelOperator also need GroupByTimeParameter
-    private GroupByTimeParameter groupByTimeParameter;
+    public GroupByLevelNode(PlanNodeId id, int[] groupByLevels) {
+        super(id);
+        this.groupByLevels = groupByLevels;
+    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/LimitNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/LimitNode.java
index e675086..63d4370 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/LimitNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/LimitNode.java
@@ -1,15 +1,22 @@
 package org.apache.iotdb.cluster.query.distribution.plan.process;
 
-import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
-import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 
 /**
- * LimitOperator is used to select top n result. It uses the default order of upstream operators
+ * LimitNode is used to select top n result. It uses the default order of upstream nodes
  *
- * Children type: [All the operators whose result set is Tablet]
  */
-public class LimitNode extends ProcessNode<TsBlock> {
+public class LimitNode extends ProcessNode {
 
     // The limit count
     private int limit;
+
+    public LimitNode(PlanNodeId id) {
+        super(id);
+    }
+
+    public LimitNode(PlanNodeId id, int limit) {
+        this(id);
+        this.limit = limit;
+    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/OffsetNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/OffsetNode.java
index 78ac9eb..3ee19ab 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/OffsetNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/OffsetNode.java
@@ -1,15 +1,22 @@
 package org.apache.iotdb.cluster.query.distribution.plan.process;
 
-import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
-import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 
 /**
- * OffsetOperator is used to skip top n result from upstream operators. It uses the default order of upstream operators
+ * OffsetNode is used to skip top n result from upstream nodes. It uses the default order of upstream nodes
  *
- * Children type: [All the operators whose result set is Tablet]
  */
-public class OffsetNode extends ProcessNode<TsBlock> {
+public class OffsetNode extends ProcessNode {
 
     // The limit count
     private int offset;
+
+    public OffsetNode(PlanNodeId id) {
+        super(id);
+    }
+
+    public OffsetNode(PlanNodeId id, int offset) {
+        this(id);
+        this.offset = offset;
+    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/ProcessNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/ProcessNode.java
index e7bffae..d5165b0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/ProcessNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/ProcessNode.java
@@ -1,6 +1,11 @@
 package org.apache.iotdb.cluster.query.distribution.plan.process;
 
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 
-public class ProcessNode<T> extends PlanNode<T> {
+public class ProcessNode extends PlanNode<TsBlock> {
+    public ProcessNode(PlanNodeId id) {
+        super(id);
+    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SeriesAggregateNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SeriesAggregateNode.java
deleted file mode 100644
index 46ef991..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SeriesAggregateNode.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.plan.process;
-
-import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
-import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchAggInfo;
-import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
-import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
-
-/**
- * SeriesAggregateOperator is responsible to do the aggregation calculation for one series.
- * This operator will split data in one series into many groups by time range and do the aggregation calculation for each
- * group.
- * If there is no split parameter, it will return one result which is the aggregation result of all data in current series.
- *
- * Children type: [SeriesScanOperator]
- */
-public class SeriesAggregateNode extends ProcessNode<TsBlock> {
-
-    // The parameter of `group by time`
-    // Its value will be null if there is no `group by time` clause,
-    private GroupByTimeParameter groupByTimeParameter;
-
-    // TODO: need consider how to represent the aggregation function and corresponding implementation
-    // We use a String to indicate the parameter temporarily
-    private String aggregationFunc;
-
-    // This method will only be invoked by SeriesAggregateOperator
-    // It will return the statistics of the series in given time range
-    // When calculate the statistics, the operator should use the most optimized way to do that. In
-    // other words, using
-    // raw data is the final way to do that.
-    public Statistics<?> getNextStatisticBetween(TimeRange timeRange) {
-        return null;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SortNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SortNode.java
index a718247..705a4c1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SortNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SortNode.java
@@ -1,15 +1,22 @@
 package org.apache.iotdb.cluster.query.distribution.plan.process;
 
-import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
-import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+import org.apache.iotdb.cluster.query.distribution.common.OrderBy;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 
 /**
- * In general, the parameter in sortOperator should be pushed down to the upstream operators.
- * In our optimized logical query plan, the sortOperator should not appear.
+ * In general, the parameter in sortNode should be pushed down to the upstream operators.
+ * In our optimized logical query plan, the sortNode should not appear.
  */
-public class SortNode extends ProcessNode<TsBlock> {
+public class SortNode extends ProcessNode {
 
-    private TraversalOrder sortOrder;
-    
+    private OrderBy sortOrder;
+
+    public SortNode(PlanNodeId id) {
+        super(id);
+    }
+
+    public SortNode(PlanNodeId id, OrderBy sortOrder) {
+        this(id);
+        this.sortOrder = sortOrder;
+    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/TimeJoinNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/TimeJoinNode.java
index 08a7617..ffbeb29 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/TimeJoinNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/TimeJoinNode.java
@@ -1,24 +1,48 @@
 package org.apache.iotdb.cluster.query.distribution.plan.process;
 
 import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.cluster.query.distribution.common.OrderBy;
 import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
 import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
+
+import java.util.Arrays;
 
 /**
- * TimeJoinOperator is responsible for join two or more series.
- * The join algorithm is like outer join by timestamp column.
+ * TimeJoinOperator is responsible for join two or more TsBlock.
+ * The join algorithm is like outer join by timestamp column. It will join two or more TsBlock by Timestamp column.
  * The output result of TimeJoinOperator is sorted by timestamp
- *
- * Children type: [SeriesScanOperator]
  */
-public class TimeJoinNode extends ProcessNode<TsBlock> {
+//TODO: define the TimeJoinMergeNode for distributed plan
+public class TimeJoinNode extends ProcessNode {
 
     // This parameter indicates the order when executing multiway merge sort.
-    private TraversalOrder mergeOrder;
+    private OrderBy mergeOrder;
 
     // The policy to decide whether a row should be discarded
     // The without policy is able to be push down to the TimeJoinOperator because we can know whether a row contains
-    // null or not in this operator the situation won't be changed by the downstream operators.
+    // null or not.
     private WithoutPolicy withoutPolicy;
+
+    public TimeJoinNode(PlanNodeId id) {
+        super(id);
+        this.mergeOrder = OrderBy.TIMESTAMP_ASC;
+    }
+
+    public TimeJoinNode(PlanNodeId id, PlanNode<TsBlock>... children) {
+        super(id);
+        this.children.addAll(Arrays.asList(children));
+    }
+
+    public void addChild(PlanNode<TsBlock> child) {
+        this.children.add(child);
+    }
+
+    public void setMergeOrder(OrderBy mergeOrder) {
+        this.mergeOrder = mergeOrder;
+    }
+
+    public void setWithoutPolicy(WithoutPolicy withoutPolicy) {
+        this.withoutPolicy = withoutPolicy;
+    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/WithoutNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/WithoutNode.java
index b3f1d9e..6afc25b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/WithoutNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/WithoutNode.java
@@ -1,16 +1,22 @@
 package org.apache.iotdb.cluster.query.distribution.plan.process;
 
-import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
-import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 
 /**
- * WithoutOperator is used to discard specific result from upstream operators.
- *
- * Children type: [All the operators whose result set is Tablet]
+ * WithoutNode is used to discard specific rows from upstream node.
  */
-public class WithoutNode extends ProcessNode<TsBlock> {
+public class WithoutNode extends ProcessNode {
 
     // The policy to discard the result from upstream operator
     private WithoutPolicy discardPolicy;
+
+    public WithoutNode(PlanNodeId id) {
+        super(id);
+    }
+
+    public WithoutNode(PlanNodeId id, WithoutPolicy discardPolicy) {
+        this(id);
+        this.discardPolicy = discardPolicy;
+    }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/CsvSinkNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/CsvSinkNode.java
index 3e9dc4c..6bafca7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/CsvSinkNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/CsvSinkNode.java
@@ -19,11 +19,20 @@
 
 package org.apache.iotdb.cluster.query.distribution.plan.sink;
 
-import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
+
+public class CsvSinkNode extends SinkNode {
+  public CsvSinkNode(PlanNodeId id) {
+    super(id);
+  }
 
-public class CsvSinkNode extends SinkNode<SeriesBatchData> {
   @Override
   public void close() throws Exception {
 
   }
+
+  @Override
+  public void send() {
+
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/FragmentSinkNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/FragmentSinkNode.java
new file mode 100644
index 0000000..348c00d
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/FragmentSinkNode.java
@@ -0,0 +1,20 @@
+package org.apache.iotdb.cluster.query.distribution.plan.sink;
+
+
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
+
+public class FragmentSinkNode extends SinkNode {
+    public FragmentSinkNode(PlanNodeId id) {
+        super(id);
+    }
+
+    @Override
+    public void send() {
+
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/SinkNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/SinkNode.java
index a94a0ff..de8ab9a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/SinkNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/SinkNode.java
@@ -19,9 +19,15 @@
 
 package org.apache.iotdb.cluster.query.distribution.plan.sink;
 
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 
-// 构建与客户端的联系。
-public abstract class SinkNode<T> extends PlanNode<T> implements AutoCloseable {
+public abstract class SinkNode extends PlanNode<TsBlock> implements AutoCloseable {
 
+    public SinkNode(PlanNodeId id) {
+        super(id);
+    }
+
+    public abstract void send();
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/ThriftSinkNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/ThriftSinkNode.java
index beed5a0..17db1f0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/ThriftSinkNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/ThriftSinkNode.java
@@ -19,10 +19,22 @@
 
 package org.apache.iotdb.cluster.query.distribution.plan.sink;
 
-import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 
-public class ThriftSinkNode extends SinkNode<SeriesBatchData> {
+/**
+ * not implemented in current IoTDB yet
+ */
+public class ThriftSinkNode extends SinkNode {
+
+  public ThriftSinkNode(PlanNodeId id) {
+    super(id);
+  }
 
   @Override
   public void close() throws Exception {}
+
+  @Override
+  public void send() {
+
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/CsvSourceNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/CsvSourceNode.java
index 85f402a..5ff967e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/CsvSourceNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/CsvSourceNode.java
@@ -19,9 +19,16 @@
 
 package org.apache.iotdb.cluster.query.distribution.plan.source;
 
-import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 
-public class CsvSourceNode extends SourceNode<SeriesBatchData> {
+/**
+ * Not implemented in current version.
+ */
+public class CsvSourceNode extends SourceNode {
+
+  public CsvSourceNode(PlanNodeId id) {
+    super(id);
+  }
 
   @Override
   public void close() throws Exception {}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesAggregateNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesAggregateNode.java
new file mode 100644
index 0000000..0a7de03
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesAggregateNode.java
@@ -0,0 +1,63 @@
+package org.apache.iotdb.cluster.query.distribution.plan.source;
+
+import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
+import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+/**
+ * SeriesAggregateOperator is responsible to do the aggregation calculation for one series. It will read the
+ * target series and calculate the aggregation result by the aggregation digest or raw data of this series.
+ *
+ * The aggregation result will be represented as a TsBlock
+ *
+ * This operator will split data of the target series into many groups by time range and do the aggregation calculation
+ * for each group. Each result will be one row of the result TsBlock. The timestamp of each row is the start time of the
+ * time range group.
+ *
+ * If there is no time range split parameter, the result TsBlock will only contain one row, which represent the whole
+ * aggregation result of this series. And the timestamp will be 0, which is meaningless.
+ */
+public class SeriesAggregateNode extends SourceNode {
+
+    // The parameter of `group by time`
+    // Its value will be null if there is no `group by time` clause,
+    private GroupByTimeParameter groupByTimeParameter;
+
+    // The aggregation function, which contains the function name and related series.
+    // (Currently we only support one series in the aggregation function)
+    // TODO: need consider whether it is suitable the aggregation function using FunctionExpression
+    private FunctionExpression aggregateFunc;
+
+    private Filter filter;
+
+    public SeriesAggregateNode(PlanNodeId id) {
+        super(id);
+    }
+
+    public SeriesAggregateNode(PlanNodeId id, FunctionExpression aggregateFunc) {
+        this(id);
+        this.aggregateFunc = aggregateFunc;
+    }
+
+    public SeriesAggregateNode(PlanNodeId id, FunctionExpression aggregateFunc, GroupByTimeParameter groupByTimeParameter) {
+        this(id, aggregateFunc);
+        this.groupByTimeParameter = groupByTimeParameter;
+    }
+
+    @Override
+    public void open() throws Exception {
+
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+
+    // This method is used when do the PredicatePushDown.
+    // The filter is not put in the constructor because the filter is only clear in the predicate push-down stage
+    public void setFilter(Filter filter) {
+        this.filter = filter;
+    }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesScanNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesScanNode.java
index 7ff8978..738a6ea 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesScanNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesScanNode.java
@@ -1,24 +1,18 @@
 package org.apache.iotdb.cluster.query.distribution.plan.source;
 
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
-import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.cluster.query.distribution.common.OrderBy;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 /**
- * SeriesScanOperator is responsible for read data and pre-aggregated statistic for a specific
- * series. When reading data, the SeriesScanOperator can read the raw data batch by batch. And also,
- * it can leverage the filter and other info to decrease the result set. Besides, the
- * SeriesScanOperator can read the pre-aggregated statistic in TsFile. And return the statistic with
- * a fix time range one by one. If the time range is narrower than the smallest pre-aggregated
- * statistic or has overlap with pre-aggregated statistic, the SeriesScanOperator will read the raw
- * data and calculate the aggregation result for specific time range.
+ * SeriesScanOperator is responsible for read data a specific series. When reading data, the SeriesScanOperator
+ * can read the raw data batch by batch. And also, it can leverage the filter and other info to decrease the
+ * result set.
  *
- * <p>Children type: []
+ * <p>Children type: no child is allowed for SeriesScanNode
  */
-public class SeriesScanNode extends SourceNode<TsBlock> {
+public class SeriesScanNode extends SourceNode {
 
   // The path of the target series which will be scanned.
   private Path seriesPath;
@@ -26,20 +20,41 @@ public class SeriesScanNode extends SourceNode<TsBlock> {
   // The order to traverse the data.
   // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
   // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
-  private TraversalOrder scanOrder = TraversalOrder.TIMESTAMP_ASC;
+  private OrderBy scanOrder = OrderBy.TIMESTAMP_ASC;
 
   // Filter data in current series.
   private Filter filter;
 
   // Limit for result set. The default value is -1, which means no limit
-  private int limit = -1;
+  private int limit;
 
   // offset for result set. The default value is 0
   private int offset;
 
+  public SeriesScanNode(PlanNodeId id, Path seriesPath) {
+    super(id);
+    this.seriesPath = seriesPath;
+  }
+
+  public void setFilter(Filter filter) {
+    this.filter = filter;
+  }
+
   @Override
   public void close() throws Exception {}
 
   @Override
   public void open() throws Exception {}
+
+  public void setScanOrder(OrderBy scanOrder) {
+    this.scanOrder = scanOrder;
+  }
+
+  public void setLimit(int limit) {
+    this.limit = limit;
+  }
+
+  public void setOffset(int offset) {
+    this.offset = offset;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SourceNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SourceNode.java
index ed0e39a..b72d0f7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SourceNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SourceNode.java
@@ -19,9 +19,15 @@
 
 package org.apache.iotdb.cluster.query.distribution.plan.source;
 
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.plan.PlanNode;
+import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId;
 
-public abstract class SourceNode<T> extends PlanNode<T> implements AutoCloseable{
+public abstract class SourceNode extends PlanNode<TsBlock> implements AutoCloseable{
+
+  public SourceNode(PlanNodeId id) {
+    super(id);
+  }
 
   public abstract void open() throws Exception;
 }

[iotdb] 06/11: rename operator to node

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/mpp-query-basis
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e11bd634d54573b187f44c913ebf090b3661d9f9
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu Mar 10 19:19:27 2022 +0800

    rename operator to node
---
 .../cluster/query/distribution/common/Tablet.java  | 29 ---------------------
 .../cluster/query/distribution/common/TsBlock.java | 30 ++++++++++++++++++++++
 .../{TabletMetadata.java => TsBlockMetadata.java}  |  2 +-
 ...viceMergeOperator.java => DeviceMergeNode.java} |  8 +++---
 .../operator/{FillOperator.java => FillNode.java}  |  6 ++---
 ...FilterExecOperator.java => FilterExecNode.java} |  6 ++---
 ...pByLevelOperator.java => GroupByLevelNode.java} |  6 ++---
 .../{LimitOperator.java => LimitNode.java}         |  6 ++---
 .../{OffsetOperator.java => OffsetNode.java}       |  6 ++---
 .../operator/{ExecOperator.java => PlanNode.java}  |  2 +-
 ...egateOperator.java => SeriesAggregateNode.java} |  2 +-
 ...SeriesScanOperator.java => SeriesScanNode.java} |  2 +-
 .../operator/{SortOperator.java => SortNode.java}  |  6 ++---
 .../{TimeJoinOperator.java => TimeJoinNode.java}   |  6 ++---
 .../{WithoutOperator.java => WithoutNode.java}     |  6 ++---
 15 files changed, 62 insertions(+), 61 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/Tablet.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/Tablet.java
deleted file mode 100644
index dde6fa7..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/Tablet.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.common;
-
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-
-/**
- * Intermediate result for most of ExecOperators.
- * The Tablet contains data from one or more series and constructs them as a row based view
- * The Tablet constructed with n series has n+1 columns where one column is timestamp and the other n columns are values
- * from each series.
- * The Tablet also contains the metadata of owned series.
- *
- * TODO: consider the detailed data store model in memory. (using column based or row based ?)
- */
-public class Tablet {
-
-    private TabletMetadata metadata;
-
-    public boolean hasNext() {
-        return false;
-    }
-
-    public RowRecord getNext() {
-        return null;
-    }
-
-    public TabletMetadata getMetadata() {
-        return metadata;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TsBlock.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TsBlock.java
new file mode 100644
index 0000000..73fb008
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TsBlock.java
@@ -0,0 +1,30 @@
+package org.apache.iotdb.cluster.query.distribution.common;
+
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+/**
+ * Intermediate result for most of ExecOperators.
+ * The Tablet contains data from one or more columns and constructs them as a row based view
+ * The columns can be series, aggregation result for one series or scalar value (such as deviceName).
+ * The Tablet also contains the metadata to describe the columns.
+ *
+ * TODO: consider the detailed data store model in memory. (using column based or row based ?)
+ */
+public class TsBlock {
+
+    // Describe the column info
+    private TsBlockMetadata metadata;
+
+    public boolean hasNext() {
+        return false;
+    }
+
+    // Get next row in current tablet
+    public RowRecord getNext() {
+        return null;
+    }
+
+    public TsBlockMetadata getMetadata() {
+        return metadata;
+    }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TabletMetadata.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TsBlockMetadata.java
similarity index 96%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TabletMetadata.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TsBlockMetadata.java
index cdf827e..cd95d82 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TabletMetadata.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TsBlockMetadata.java
@@ -2,7 +2,7 @@ package org.apache.iotdb.cluster.query.distribution.common;
 
 import java.util.List;
 
-public class TabletMetadata {
+public class TsBlockMetadata {
     // list of all columns in current Tablet
     // The column list not only contains the series column, but also contains other column to construct the final result
     // set such as timestamp and deviceName
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeNode.java
similarity index 85%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeNode.java
index 1800179..ff831d9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeNode.java
@@ -1,6 +1,6 @@
 package org.apache.iotdb.cluster.query.distribution.operator;
 
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
 
 import java.util.List;
@@ -15,7 +15,7 @@ import java.util.Map;
  *
  * Children type: [TimeJoinOperator]
  */
-public class DeviceMergeOperator extends ExecOperator<Tablet> {
+public class DeviceMergeNode extends PlanNode<TsBlock> {
     // The result output order that this operator
     private TraversalOrder mergeOrder;
 
@@ -23,7 +23,7 @@ public class DeviceMergeOperator extends ExecOperator<Tablet> {
     private List<String> ownedDeviceNameList;
 
     // The map from deviceName to corresponding query result operator responsible for that device.
-    private Map<String, TimeJoinOperator> upstreamMap;
+    private Map<String, TimeJoinNode> upstreamMap;
 
     @Override
     public boolean hasNext() {
@@ -34,7 +34,7 @@ public class DeviceMergeOperator extends ExecOperator<Tablet> {
     // the additional column is `deviceName`
     // And, the `alignedByDevice` in the TabletMetadata will be `true`
     @Override
-    public Tablet getNextBatch() {
+    public TsBlock getNextBatch() {
         return null;
     }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillNode.java
similarity index 75%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillNode.java
index 9390e59..a8f0076 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillNode.java
@@ -1,14 +1,14 @@
 package org.apache.iotdb.cluster.query.distribution.operator;
 
 import org.apache.iotdb.cluster.query.distribution.common.FillPolicy;
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 
 /**
  * FillOperator is used to fill the empty field in one row.
  *
  * Children type: [All the operators whose result set is Tablet]
  */
-public class FillOperator extends ExecOperator<Tablet> {
+public class FillNode extends PlanNode<TsBlock> {
 
     // The policy to discard the result from upstream operator
     private FillPolicy fillPolicy;
@@ -19,7 +19,7 @@ public class FillOperator extends ExecOperator<Tablet> {
     }
 
     @Override
-    public Tablet getNextBatch() {
+    public TsBlock getNextBatch() {
         return null;
     }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecNode.java
similarity index 77%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecNode.java
index efe4609..0bf0ad4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecNode.java
@@ -1,6 +1,6 @@
 package org.apache.iotdb.cluster.query.distribution.operator;
 
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
 
 /**
@@ -9,7 +9,7 @@ import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
  *
  * Children type: [All the operators whose result set is Tablet]
  */
-public class FilterExecOperator extends ExecOperator<Tablet> {
+public class FilterExecNode extends PlanNode<TsBlock> {
 
     // The filter
     private FilterOperator rowFilter;
@@ -20,7 +20,7 @@ public class FilterExecOperator extends ExecOperator<Tablet> {
     }
 
     @Override
-    public Tablet getNextBatch() {
+    public TsBlock getNextBatch() {
         return null;
     }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelNode.java
similarity index 87%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelNode.java
index 729c84f..38448d5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelNode.java
@@ -2,7 +2,7 @@ package org.apache.iotdb.cluster.query.distribution.operator;
 
 import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
 import org.apache.iotdb.cluster.query.distribution.common.LevelBucketInfo;
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 
 /**
  * This operator is responsible for the final aggregation merge operation.
@@ -13,7 +13,7 @@ import org.apache.iotdb.cluster.query.distribution.common.Tablet;
  *
  * Children type: [SeriesAggregateOperator]
  */
-public class GroupByLevelOperator extends ExecOperator<Tablet> {
+public class GroupByLevelNode extends PlanNode<TsBlock> {
 
     // All the buckets that the SeriesBatchAggInfo from upstream will be divided into.
     private LevelBucketInfo bucketInfo;
@@ -28,7 +28,7 @@ public class GroupByLevelOperator extends ExecOperator<Tablet> {
     }
 
     @Override
-    public Tablet getNextBatch() {
+    public TsBlock getNextBatch() {
         return null;
     }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitNode.java
similarity index 71%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitNode.java
index a48e664..875f8c4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitNode.java
@@ -1,13 +1,13 @@
 package org.apache.iotdb.cluster.query.distribution.operator;
 
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 
 /**
  * LimitOperator is used to select top n result. It uses the default order of upstream operators
  *
  * Children type: [All the operators whose result set is Tablet]
  */
-public class LimitOperator extends ExecOperator<Tablet> {
+public class LimitNode extends PlanNode<TsBlock> {
 
     // The limit count
     private int limit;
@@ -18,7 +18,7 @@ public class LimitOperator extends ExecOperator<Tablet> {
     }
 
     @Override
-    public Tablet getNextBatch() {
+    public TsBlock getNextBatch() {
         return null;
     }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetNode.java
similarity index 72%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetNode.java
index 11cd7de..6de44c5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetNode.java
@@ -1,13 +1,13 @@
 package org.apache.iotdb.cluster.query.distribution.operator;
 
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 
 /**
  * OffsetOperator is used to skip top n result from upstream operators. It uses the default order of upstream operators
  *
  * Children type: [All the operators whose result set is Tablet]
  */
-public class OffsetOperator extends ExecOperator<Tablet> {
+public class OffsetNode extends PlanNode<TsBlock> {
 
     // The limit count
     private int offset;
@@ -18,7 +18,7 @@ public class OffsetOperator extends ExecOperator<Tablet> {
     }
 
     @Override
-    public Tablet getNextBatch() {
+    public TsBlock getNextBatch() {
         return null;
     }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/PlanNode.java
similarity index 88%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/PlanNode.java
index 883589b..ae30645 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/PlanNode.java
@@ -7,7 +7,7 @@ import org.apache.iotdb.cluster.query.distribution.common.TreeNode;
  * The base class of query executable operators, which is used to compose logical query plan.
  * TODO: consider how to restrict the children type for each type of ExecOperator
  */
-public abstract class ExecOperator<T> extends TreeNode<ExecOperator<?>> {
+public abstract class PlanNode<T> extends TreeNode<PlanNode<?>> {
 
     // Judge whether current operator has more result
     public abstract boolean hasNext();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateNode.java
similarity index 93%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateNode.java
index e566f49..62fb223 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateNode.java
@@ -11,7 +11,7 @@ import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchAggInfo;
  *
  * Children type: [SeriesScanOperator]
  */
-public class SeriesAggregateOperator extends ExecOperator<SeriesBatchAggInfo> {
+public class SeriesAggregateNode extends PlanNode<SeriesBatchAggInfo> {
 
     // The parameter of `group by time`
     // Its value will be null if there is no `group by time` clause,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanNode.java
similarity index 96%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanNode.java
index 46077bd..1c8ac3d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanNode.java
@@ -18,7 +18,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
  *
  * Children type: []
  */
-public class SeriesScanOperator extends ExecOperator<SeriesBatchData> {
+public class SeriesScanNode extends PlanNode<SeriesBatchData> {
 
     // The path of the target series which will be scanned.
     private Path seriesPath;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortNode.java
similarity index 74%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortNode.java
index ab3f01d..8bcc520 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortNode.java
@@ -1,13 +1,13 @@
 package org.apache.iotdb.cluster.query.distribution.operator;
 
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
 
 /**
  * In general, the parameter in sortOperator should be pushed down to the upstream operators.
  * In our optimized logical query plan, the sortOperator should not appear.
  */
-public class SortOperator extends ExecOperator<Tablet> {
+public class SortNode extends PlanNode<TsBlock> {
 
     private TraversalOrder sortOrder;
 
@@ -17,7 +17,7 @@ public class SortOperator extends ExecOperator<Tablet> {
     }
 
     @Override
-    public Tablet getNextBatch() {
+    public TsBlock getNextBatch() {
         return null;
     }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinNode.java
similarity index 86%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinNode.java
index c573495..b293a13 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinNode.java
@@ -1,6 +1,6 @@
 package org.apache.iotdb.cluster.query.distribution.operator;
 
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
 import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
 
@@ -11,7 +11,7 @@ import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
  *
  * Children type: [SeriesScanOperator]
  */
-public class TimeJoinOperator extends ExecOperator<Tablet> {
+public class TimeJoinNode extends PlanNode<TsBlock> {
 
     // This parameter indicates the order when executing multiway merge sort.
     private TraversalOrder mergeOrder;
@@ -27,7 +27,7 @@ public class TimeJoinOperator extends ExecOperator<Tablet> {
     }
 
     @Override
-    public Tablet getNextBatch() {
+    public TsBlock getNextBatch() {
         return null;
     }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutNode.java
similarity index 76%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutOperator.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutNode.java
index f25d435..c58a367 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutNode.java
@@ -1,6 +1,6 @@
 package org.apache.iotdb.cluster.query.distribution.operator;
 
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TsBlock;
 import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
 
 /**
@@ -8,7 +8,7 @@ import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
  *
  * Children type: [All the operators whose result set is Tablet]
  */
-public class WithoutOperator extends ExecOperator<Tablet> {
+public class WithoutNode extends PlanNode<TsBlock> {
 
     // The policy to discard the result from upstream operator
     private WithoutPolicy discardPolicy;
@@ -19,7 +19,7 @@ public class WithoutOperator extends ExecOperator<Tablet> {
     }
 
     @Override
-    public Tablet getNextBatch() {
+    public TsBlock getNextBatch() {
         return null;
     }
 }

[iotdb] 01/11: add basic operators

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/mpp-query-basis
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bdaf57ce7b381784d93d791a51d2a1ead782460d
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Mar 1 20:06:07 2022 +0800

    add basic operators
---
 .../query/distribution/common/SeriesBatchData.java | 14 +++++++
 .../query/distribution/common/TraversalOrder.java  |  9 +++++
 .../db/query/distribution/common/TreeNode.java     | 23 +++++++++++
 .../query/distribution/operator/ExecOperator.java  | 18 +++++++++
 .../distribution/operator/SeriesScanOperator.java  | 46 ++++++++++++++++++++++
 .../distribution/operator/TimeJoinOperator.java    | 23 +++++++++++
 6 files changed, 133 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/SeriesBatchData.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/SeriesBatchData.java
new file mode 100644
index 0000000..7a9ae65
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/SeriesBatchData.java
@@ -0,0 +1,14 @@
+package org.apache.iotdb.db.query.distribution.common;
+
+import org.apache.iotdb.tsfile.read.common.BatchData;
+
+/**
+ * @author xingtanzjr
+ * TODO: currently we only use it to describe the result set of SeriesScanOperator
+ * The BatchData is suitable as the encapsulation of part of result set of SeriesScanOperator
+ * BatchData is the class defined and generally used in single-node IoTDB
+ * We leverage it as the `batch` here. We can consider a more general name or make some modifications for it.
+ */
+public class SeriesBatchData extends BatchData {
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/TraversalOrder.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/TraversalOrder.java
new file mode 100644
index 0000000..6b00372
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/TraversalOrder.java
@@ -0,0 +1,9 @@
+package org.apache.iotdb.db.query.distribution.common;
+
+/**
+ * The traversal order for operators by timestamp
+ */
+public enum TraversalOrder {
+    ASC,
+    DESC
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/common/TreeNode.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/TreeNode.java
new file mode 100644
index 0000000..9cf2dd3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/common/TreeNode.java
@@ -0,0 +1,23 @@
+package org.apache.iotdb.db.query.distribution.common;
+
+import java.util.List;
+
+/**
+ * @author A simple class to describe the tree style structure of query executable operators
+ * @param <T>
+ */
+public class TreeNode<T extends TreeNode<T>> {
+    protected List<T> children;
+
+    public T getChild(int i) {
+        return hasChild(i) ? children.get(i) : null;
+    }
+
+    public boolean hasChild(int i) {
+        return children.size() > i;
+    }
+
+    public void addChild(T n) {
+        children.add(n);
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/ExecOperator.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/ExecOperator.java
new file mode 100644
index 0000000..b9a1691
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/ExecOperator.java
@@ -0,0 +1,18 @@
+package org.apache.iotdb.db.query.distribution.operator;
+
+import org.apache.iotdb.db.query.distribution.common.TreeNode;
+
+/**
+ * @author xingtanzjr
+ * The base class of query executable operators, which is used to compose logical query plan.
+ * TODO: consider how to restrict the children type for each type of ExecOperator
+ */
+public abstract class ExecOperator<T> extends TreeNode<ExecOperator<?>> {
+
+    // Judge whether current operator has more result
+    public abstract boolean hasNext();
+
+    // Get next result batch of this operator
+    // Return null if there is no more result to return
+    public abstract T getNextBatch();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SeriesScanOperator.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SeriesScanOperator.java
new file mode 100644
index 0000000..ebbc03d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/SeriesScanOperator.java
@@ -0,0 +1,46 @@
+package org.apache.iotdb.db.query.distribution.operator;
+
+import org.apache.iotdb.db.query.distribution.common.SeriesBatchData;
+import org.apache.iotdb.db.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+/**
+ * SeriesScanOperator is responsible for read data and pre-aggregated statistic for a specific series.
+ * When reading data, the SeriesScanOperator can read the raw data batch by batch. And also, it can leverage
+ * the filter and other info to decrease the result set.
+ * Besides, the SeriesScanOperator can read the pre-aggregated statistic in TsFile. And return the statistic with
+ * a fix time range one by one. If the time range is narrower than the smallest pre-aggregated statistic or has overlap
+ * with pre-aggregated statistic, the SeriesScanOperator will read the raw data and calculate the aggregation result for
+ * specific time range.
+ *
+ * Children type: []
+ */
+public class SeriesScanOperator extends ExecOperator<SeriesBatchData> {
+
+    // The path of the target series which will be scanned.
+    private Path seriesPath;
+
+    // The order to traverse the series by timestamp.
+    // The default order is ASC, which means "order by timestamp asc"
+    private TraversalOrder scanOrder = TraversalOrder.ASC;
+
+    // Filter data in current series.
+    private Filter filter;
+
+    // Limit for result set. The default value is -1, which means no limit
+    private int limit = -1;
+
+    // offset for result set. The default value is 0
+    private int offset;
+
+    @Override
+    public boolean hasNext() {
+        return false;
+    }
+
+    @Override
+    public SeriesBatchData getNextBatch() {
+        return null;
+    }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/TimeJoinOperator.java
new file mode 100644
index 0000000..8361276
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/distribution/operator/TimeJoinOperator.java
@@ -0,0 +1,23 @@
+package org.apache.iotdb.db.query.distribution.operator;
+
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+/**
+ * TimeJoinOperator is responsible for join two or more series.
+ * The join algorithm is like outer join by timestamp column.
+ * The output result of TimeJoinOperator is sorted by timestamp
+ *
+ * Children type: [SeriesScanOperator]
+ */
+public class TimeJoinOperator extends ExecOperator<RowRecord> {
+
+    @Override
+    public boolean hasNext() {
+        return false;
+    }
+
+    @Override
+    public RowRecord getNextBatch() {
+        return null;
+    }
+}