You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/03/08 15:41:16 UTC

[iotdb] 01/02: design of source, sink and internal operators

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch suyurong/operator-design
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 60cd7f73c62c84914db52fc824e0f68f908ef26b
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Mar 8 23:39:41 2022 +0800

    design of source, sink and internal operators
---
 .../distribution/operator/DeviceMergeOperator.java | 40 ---------------
 .../query/distribution/operator/ExecOperator.java  | 18 -------
 .../query/distribution/operator/FillOperator.java  | 25 ----------
 .../distribution/operator/FilterExecOperator.java  | 26 ----------
 .../operator/GroupByLevelOperator.java             | 34 -------------
 .../query/distribution/operator/LimitOperator.java | 24 ---------
 .../distribution/operator/OffsetOperator.java      | 24 ---------
 .../operator/SeriesAggregateOperator.java          | 33 ------------
 .../distribution/operator/SeriesScanOperator.java  | 57 ---------------------
 .../query/distribution/operator/SortOperator.java  | 23 ---------
 .../distribution/operator/TimeJoinOperator.java    | 33 ------------
 .../distribution/operator/WithoutOperator.java     | 25 ----------
 .../operator/internal/DeviceMergeOperator.java     | 43 ++++++++++++++++
 .../operator/internal/FillOperator.java            | 25 ++++++++++
 .../operator/internal/FilterInternalOperator.java  | 26 ++++++++++
 .../operator/internal/GroupByLevelOperator.java    | 34 +++++++++++++
 .../operator/internal/InternalOperator.java        | 17 +++++++
 .../operator/internal/LimitOperator.java           | 24 +++++++++
 .../operator/internal/OffsetOperator.java          | 25 ++++++++++
 .../operator/internal/SeriesAggregateOperator.java | 33 ++++++++++++
 .../operator/internal/SortOperator.java            | 23 +++++++++
 .../operator/internal/TimeJoinOperator.java        | 33 ++++++++++++
 .../operator/internal/WithoutOperator.java         | 25 ++++++++++
 .../operator/source/SeriesScanOperator.java        | 58 ++++++++++++++++++++++
 24 files changed, 366 insertions(+), 362 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeOperator.java
deleted file mode 100644
index 1800179..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeOperator.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * DeviceMergeOperator is responsible for constructing a device-based view of a set of series. And output the result with
- * specific order. The order could be 'order by device' or 'order by timestamp'
- *
- * The types of involved devices should be same. If the device contains n series, the device-based view will contain n+2
- * columns, which are timestamp column, device name column and n value columns of involved series.
- *
- * Children type: [TimeJoinOperator]
- */
-public class DeviceMergeOperator extends ExecOperator<Tablet> {
-    // The result output order that this operator
-    private TraversalOrder mergeOrder;
-
-    // Owned devices
-    private List<String> ownedDeviceNameList;
-
-    // The map from deviceName to corresponding query result operator responsible for that device.
-    private Map<String, TimeJoinOperator> upstreamMap;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    // If the Tablet from TimeJoinOperator has n columns, the output of DeviceMergeOperator will contain n+1 columns where
-    // the additional column is `deviceName`
-    // And, the `alignedByDevice` in the TabletMetadata will be `true`
-    @Override
-    public Tablet getNextBatch() {
-        return null;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecOperator.java
deleted file mode 100644
index 883589b..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecOperator.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.TreeNode;
-
-/**
- * @author xingtanzjr
- * The base class of query executable operators, which is used to compose logical query plan.
- * TODO: consider how to restrict the children type for each type of ExecOperator
- */
-public abstract class ExecOperator<T> extends TreeNode<ExecOperator<?>> {
-
-    // Judge whether current operator has more result
-    public abstract boolean hasNext();
-
-    // Get next result batch of this operator
-    // Return null if there is no more result to return
-    public abstract T getNextBatch();
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillOperator.java
deleted file mode 100644
index 9390e59..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillOperator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.FillPolicy;
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-
-/**
- * FillOperator is used to fill the empty field in one row.
- *
- * Children type: [All the operators whose result set is Tablet]
- */
-public class FillOperator extends ExecOperator<Tablet> {
-
-    // The policy to discard the result from upstream operator
-    private FillPolicy fillPolicy;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public Tablet getNextBatch() {
-        return null;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecOperator.java
deleted file mode 100644
index efe4609..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecOperator.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
-
-/**
- * (We use FilterExecOperator to distinguish itself from the FilterOperator used in single-node IoTDB)
- * The FilterExecOperator is responsible to filter the RowRecord from Tablet.
- *
- * Children type: [All the operators whose result set is Tablet]
- */
-public class FilterExecOperator extends ExecOperator<Tablet> {
-
-    // The filter
-    private FilterOperator rowFilter;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public Tablet getNextBatch() {
-        return null;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelOperator.java
deleted file mode 100644
index 729c84f..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelOperator.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
-import org.apache.iotdb.cluster.query.distribution.common.LevelBucketInfo;
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-
-/**
- * This operator is responsible for the final aggregation merge operation.
- * It will arrange the data by time range firstly. And inside each time range, the data from same measurement and
- * different devices will be rolled up by corresponding level into different buckets.
- * If the bucketInfo is empty, the data from `same measurement and different devices` won't be rolled up.
- * If the groupByTimeParameter is null, the data won't be split by time range.
- *
- * Children type: [SeriesAggregateOperator]
- */
-public class GroupByLevelOperator extends ExecOperator<Tablet> {
-
-    // All the buckets that the SeriesBatchAggInfo from upstream will be divided into.
-    private LevelBucketInfo bucketInfo;
-
-    // The parameter of `group by time`
-    // The GroupByLevelOperator also need GroupByTimeParameter
-    private GroupByTimeParameter groupByTimeParameter;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public Tablet getNextBatch() {
-        return null;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitOperator.java
deleted file mode 100644
index a48e664..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitOperator.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-
-/**
- * LimitOperator is used to select top n result. It uses the default order of upstream operators
- *
- * Children type: [All the operators whose result set is Tablet]
- */
-public class LimitOperator extends ExecOperator<Tablet> {
-
-    // The limit count
-    private int limit;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public Tablet getNextBatch() {
-        return null;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetOperator.java
deleted file mode 100644
index 11cd7de..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetOperator.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-
-/**
- * OffsetOperator is used to skip top n result from upstream operators. It uses the default order of upstream operators
- *
- * Children type: [All the operators whose result set is Tablet]
- */
-public class OffsetOperator extends ExecOperator<Tablet> {
-
-    // The limit count
-    private int offset;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public Tablet getNextBatch() {
-        return null;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateOperator.java
deleted file mode 100644
index e566f49..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateOperator.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
-import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchAggInfo;
-
-/**
- * SeriesAggregateOperator is responsible to do the aggregation calculation for one series.
- * This operator will split data in one series into many groups by time range and do the aggregation calculation for each
- * group.
- * If there is no split parameter, it will return one result which is the aggregation result of all data in current series.
- *
- * Children type: [SeriesScanOperator]
- */
-public class SeriesAggregateOperator extends ExecOperator<SeriesBatchAggInfo> {
-
-    // The parameter of `group by time`
-    // Its value will be null if there is no `group by time` clause,
-    private GroupByTimeParameter groupByTimeParameter;
-
-    // TODO: need consider how to represent the aggregation function and corresponding implementation
-    // We use a String to indicate the parameter temporarily
-    private String aggregationFunc;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public SeriesBatchAggInfo getNextBatch() {
-        return null;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanOperator.java
deleted file mode 100644
index 46077bd..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanOperator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-
-/**
- * SeriesScanOperator is responsible for read data and pre-aggregated statistic for a specific series.
- * When reading data, the SeriesScanOperator can read the raw data batch by batch. And also, it can leverage
- * the filter and other info to decrease the result set.
- * Besides, the SeriesScanOperator can read the pre-aggregated statistic in TsFile. And return the statistic with
- * a fix time range one by one. If the time range is narrower than the smallest pre-aggregated statistic or has overlap
- * with pre-aggregated statistic, the SeriesScanOperator will read the raw data and calculate the aggregation result for
- * specific time range.
- *
- * Children type: []
- */
-public class SeriesScanOperator extends ExecOperator<SeriesBatchData> {
-
-    // The path of the target series which will be scanned.
-    private Path seriesPath;
-
-    // The order to traverse the data.
-    // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
-    // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
-    private TraversalOrder scanOrder = TraversalOrder.TIMESTAMP_ASC;
-
-    // Filter data in current series.
-    private Filter filter;
-
-    // Limit for result set. The default value is -1, which means no limit
-    private int limit = -1;
-
-    // offset for result set. The default value is 0
-    private int offset;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public SeriesBatchData getNextBatch() {
-        return null;
-    }
-
-    // This method will only be invoked by SeriesAggregateOperator
-    // It will return the statistics of the series in given time range
-    // When calculate the statistics, the operator should use the most optimized way to do that. In other words, using
-    // raw data is the final way to do that.
-    public Statistics<?> getNextStatisticBetween(TimeRange timeRange) {
-        return null;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortOperator.java
deleted file mode 100644
index ab3f01d..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortOperator.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
-
-/**
- * In general, the parameter in sortOperator should be pushed down to the upstream operators.
- * In our optimized logical query plan, the sortOperator should not appear.
- */
-public class SortOperator extends ExecOperator<Tablet> {
-
-    private TraversalOrder sortOrder;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public Tablet getNextBatch() {
-        return null;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinOperator.java
deleted file mode 100644
index c573495..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinOperator.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
-import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
-
-/**
- * TimeJoinOperator is responsible for join two or more series.
- * The join algorithm is like outer join by timestamp column.
- * The output result of TimeJoinOperator is sorted by timestamp
- *
- * Children type: [SeriesScanOperator]
- */
-public class TimeJoinOperator extends ExecOperator<Tablet> {
-
-    // This parameter indicates the order when executing multiway merge sort.
-    private TraversalOrder mergeOrder;
-
-    // The policy to decide whether a row should be discarded
-    // The without policy is able to be push down to the TimeJoinOperator because we can know whether a row contains
-    // null or not in this operator the situation won't be changed by the downstream operators.
-    private WithoutPolicy withoutPolicy;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public Tablet getNextBatch() {
-        return null;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutOperator.java
deleted file mode 100644
index f25d435..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutOperator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
-
-/**
- * WithoutOperator is used to discard specific result from upstream operators.
- *
- * Children type: [All the operators whose result set is Tablet]
- */
-public class WithoutOperator extends ExecOperator<Tablet> {
-
-    // The policy to discard the result from upstream operator
-    private WithoutPolicy discardPolicy;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public Tablet getNextBatch() {
-        return null;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java
new file mode 100644
index 0000000..b912994
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java
@@ -0,0 +1,43 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DeviceMergeOperator is responsible for constructing a device-based view of a set of series. And
+ * output the result with specific order. The order could be 'order by device' or 'order by
+ * timestamp'
+ *
+ * <p>The types of involved devices should be same. If the device contains n series, the
+ * device-based view will contain n+2 columns, which are timestamp column, device name column and n
+ * value columns of involved series.
+ *
+ * <p>Children type: [TimeJoinOperator]
+ */
+public class DeviceMergeOperator extends InternalOperator<Tablet> {
+  // The result output order that this operator
+  private TraversalOrder mergeOrder;
+
+  // Owned devices
+  private List<String> ownedDeviceNameList;
+
+  // The map from deviceName to corresponding query result operator responsible for that device.
+  private Map<String, TimeJoinOperator> upstreamMap;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  // If the Tablet from TimeJoinOperator has n columns, the output of DeviceMergeOperator will
+  // contain n+1 columns where
+  // the additional column is `deviceName`
+  // And, the `alignedByDevice` in the TabletMetadata will be `true`
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java
new file mode 100644
index 0000000..8ee610a
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java
@@ -0,0 +1,25 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.FillPolicy;
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+
+/**
+ * FillOperator is used to fill the empty field in one row.
+ *
+ * <p>Children type: [All the operators whose result set is Tablet]
+ */
+public class FillOperator extends InternalOperator<Tablet> {
+
+  // The policy to discard the result from upstream operator
+  private FillPolicy fillPolicy;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java
new file mode 100644
index 0000000..ecbda0f
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java
@@ -0,0 +1,26 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
+
+/**
+ * (We use FilterExecOperator to distinguish itself from the FilterOperator used in single-node
+ * IoTDB) The FilterExecOperator is responsible to filter the RowRecord from Tablet.
+ *
+ * <p>Children type: [All the operators whose result set is Tablet]
+ */
+public class FilterInternalOperator extends InternalOperator<Tablet> {
+
+  // The filter
+  private FilterOperator rowFilter;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java
new file mode 100644
index 0000000..c6e00d2
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java
@@ -0,0 +1,34 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
+import org.apache.iotdb.cluster.query.distribution.common.LevelBucketInfo;
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+
+/**
+ * This operator is responsible for the final aggregation merge operation. It will arrange the data
+ * by time range firstly. And inside each time range, the data from same measurement and different
+ * devices will be rolled up by corresponding level into different buckets. If the bucketInfo is
+ * empty, the data from `same measurement and different devices` won't be rolled up. If the
+ * groupByTimeParameter is null, the data won't be split by time range.
+ *
+ * <p>Children type: [SeriesAggregateOperator]
+ */
+public class GroupByLevelOperator extends InternalOperator<Tablet> {
+
+  // All the buckets that the SeriesBatchAggInfo from upstream will be divided into.
+  private LevelBucketInfo bucketInfo;
+
+  // The parameter of `group by time`
+  // The GroupByLevelOperator also need GroupByTimeParameter
+  private GroupByTimeParameter groupByTimeParameter;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java
new file mode 100644
index 0000000..0692381
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java
@@ -0,0 +1,17 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.TreeNode;
+
+/**
+ * @author xingtanzjr The base class of query executable operators, which is used to compose logical
+ *     query plan. TODO: consider how to restrict the children type for each type of ExecOperator
+ */
+public abstract class InternalOperator<T> extends TreeNode<InternalOperator<?>> {
+
+  // Judge whether current operator has more result
+  public abstract boolean hasNext();
+
+  // Get next result batch of this operator
+  // Return null if there is no more result to return
+  public abstract T getNextBatch();
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java
new file mode 100644
index 0000000..9075f64
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java
@@ -0,0 +1,24 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+
+/**
+ * LimitOperator is used to select top n result. It uses the default order of upstream operators
+ *
+ * <p>Children type: [All the operators whose result set is Tablet]
+ */
+public class LimitOperator extends InternalOperator<Tablet> {
+
+  // The limit count
+  private int limit;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java
new file mode 100644
index 0000000..c6f79ac
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java
@@ -0,0 +1,25 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+
+/**
+ * OffsetOperator is used to skip top n result from upstream operators. It uses the default order of
+ * upstream operators
+ *
+ * <p>Children type: [All the operators whose result set is Tablet]
+ */
+public class OffsetOperator extends InternalOperator<Tablet> {
+
+  // The limit count
+  private int offset;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java
new file mode 100644
index 0000000..96a5c86
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java
@@ -0,0 +1,33 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
+import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchAggInfo;
+
+/**
+ * SeriesAggregateOperator is responsible to do the aggregation calculation for one series. This
+ * operator will split data in one series into many groups by time range and do the aggregation
+ * calculation for each group. If there is no split parameter, it will return one result which is
+ * the aggregation result of all data in current series.
+ *
+ * <p>Children type: [SeriesScanOperator]
+ */
+public class SeriesAggregateOperator extends InternalOperator<SeriesBatchAggInfo> {
+
+  // The parameter of `group by time`
+  // Its value will be null if there is no `group by time` clause,
+  private GroupByTimeParameter groupByTimeParameter;
+
+  // TODO: need consider how to represent the aggregation function and corresponding implementation
+  // We use a String to indicate the parameter temporarily
+  private String aggregationFunc;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public SeriesBatchAggInfo getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java
new file mode 100644
index 0000000..fa83be7
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java
@@ -0,0 +1,23 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+
+/**
+ * In general, the parameter in sortOperator should be pushed down to the upstream operators. In our
+ * optimized logical query plan, the sortOperator should not appear.
+ */
+public class SortOperator extends InternalOperator<Tablet> {
+
+  private TraversalOrder sortOrder;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java
new file mode 100644
index 0000000..c4ae6f3
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java
@@ -0,0 +1,33 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
+
+/**
+ * TimeJoinOperator is responsible for join two or more series. The join algorithm is like outer
+ * join by timestamp column. The output result of TimeJoinOperator is sorted by timestamp
+ *
+ * <p>Children type: [SeriesScanOperator]
+ */
+public class TimeJoinOperator extends InternalOperator<Tablet> {
+
+  // This parameter indicates the order when executing multiway merge sort.
+  private TraversalOrder mergeOrder;
+
+  // The policy to decide whether a row should be discarded
+  // The without policy is able to be push down to the TimeJoinOperator because we can know whether
+  // a row contains
+  // null or not in this operator the situation won't be changed by the downstream operators.
+  private WithoutPolicy withoutPolicy;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java
new file mode 100644
index 0000000..535a3d6
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java
@@ -0,0 +1,25 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
+
+/**
+ * WithoutOperator is used to discard specific result from upstream operators.
+ *
+ * <p>Children type: [All the operators whose result set is Tablet]
+ */
+public class WithoutOperator extends InternalOperator<Tablet> {
+
+  // The policy to discard the result from upstream operator
+  private WithoutPolicy discardPolicy;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java
new file mode 100644
index 0000000..9cd6043
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java
@@ -0,0 +1,58 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
+import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+/**
+ * SeriesScanOperator is responsible for read data and pre-aggregated statistic for a specific
+ * series. When reading data, the SeriesScanOperator can read the raw data batch by batch. And also,
+ * it can leverage the filter and other info to decrease the result set. Besides, the
+ * SeriesScanOperator can read the pre-aggregated statistic in TsFile. And return the statistic with
+ * a fix time range one by one. If the time range is narrower than the smallest pre-aggregated
+ * statistic or has overlap with pre-aggregated statistic, the SeriesScanOperator will read the raw
+ * data and calculate the aggregation result for specific time range.
+ *
+ * <p>Children type: []
+ */
+public class SeriesScanOperator extends InternalOperator<SeriesBatchData> {
+
+  // The path of the target series which will be scanned.
+  private Path seriesPath;
+
+  // The order to traverse the data.
+  // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
+  // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
+  private TraversalOrder scanOrder = TraversalOrder.TIMESTAMP_ASC;
+
+  // Filter data in current series.
+  private Filter filter;
+
+  // Limit for result set. The default value is -1, which means no limit
+  private int limit = -1;
+
+  // offset for result set. The default value is 0
+  private int offset;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public SeriesBatchData getNextBatch() {
+    return null;
+  }
+
+  // This method will only be invoked by SeriesAggregateOperator
+  // It will return the statistics of the series in given time range
+  // When calculate the statistics, the operator should use the most optimized way to do that. In
+  // other words, using
+  // raw data is the final way to do that.
+  public Statistics<?> getNextStatisticBetween(TimeRange timeRange) {
+    return null;
+  }
+}