You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/03/08 15:41:15 UTC

[iotdb] branch suyurong/operator-design created (now bdbd38e)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a change to branch suyurong/operator-design
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at bdbd38e  design of source, sink and internal operators

This branch includes the following new commits:

     new 60cd7f7  design of source, sink and internal operators
     new bdbd38e  design of source, sink and internal operators

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/02: design of source, sink and internal operators

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch suyurong/operator-design
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 60cd7f73c62c84914db52fc824e0f68f908ef26b
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Mar 8 23:39:41 2022 +0800

    design of source, sink and internal operators
---
 .../distribution/operator/DeviceMergeOperator.java | 40 ---------------
 .../query/distribution/operator/ExecOperator.java  | 18 -------
 .../query/distribution/operator/FillOperator.java  | 25 ----------
 .../distribution/operator/FilterExecOperator.java  | 26 ----------
 .../operator/GroupByLevelOperator.java             | 34 -------------
 .../query/distribution/operator/LimitOperator.java | 24 ---------
 .../distribution/operator/OffsetOperator.java      | 24 ---------
 .../operator/SeriesAggregateOperator.java          | 33 ------------
 .../distribution/operator/SeriesScanOperator.java  | 57 ---------------------
 .../query/distribution/operator/SortOperator.java  | 23 ---------
 .../distribution/operator/TimeJoinOperator.java    | 33 ------------
 .../distribution/operator/WithoutOperator.java     | 25 ----------
 .../operator/internal/DeviceMergeOperator.java     | 43 ++++++++++++++++
 .../operator/internal/FillOperator.java            | 25 ++++++++++
 .../operator/internal/FilterInternalOperator.java  | 26 ++++++++++
 .../operator/internal/GroupByLevelOperator.java    | 34 +++++++++++++
 .../operator/internal/InternalOperator.java        | 17 +++++++
 .../operator/internal/LimitOperator.java           | 24 +++++++++
 .../operator/internal/OffsetOperator.java          | 25 ++++++++++
 .../operator/internal/SeriesAggregateOperator.java | 33 ++++++++++++
 .../operator/internal/SortOperator.java            | 23 +++++++++
 .../operator/internal/TimeJoinOperator.java        | 33 ++++++++++++
 .../operator/internal/WithoutOperator.java         | 25 ++++++++++
 .../operator/source/SeriesScanOperator.java        | 58 ++++++++++++++++++++++
 24 files changed, 366 insertions(+), 362 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeOperator.java
deleted file mode 100644
index 1800179..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/DeviceMergeOperator.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * DeviceMergeOperator is responsible for constructing a device-based view of a set of series. And output the result with
- * specific order. The order could be 'order by device' or 'order by timestamp'
- *
- * The types of involved devices should be same. If the device contains n series, the device-based view will contain n+2
- * columns, which are timestamp column, device name column and n value columns of involved series.
- *
- * Children type: [TimeJoinOperator]
- */
-public class DeviceMergeOperator extends ExecOperator<Tablet> {
-    // The result output order that this operator
-    private TraversalOrder mergeOrder;
-
-    // Owned devices
-    private List<String> ownedDeviceNameList;
-
-    // The map from deviceName to corresponding query result operator responsible for that device.
-    private Map<String, TimeJoinOperator> upstreamMap;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    // If the Tablet from TimeJoinOperator has n columns, the output of DeviceMergeOperator will contain n+1 columns where
-    // the additional column is `deviceName`
-    // And, the `alignedByDevice` in the TabletMetadata will be `true`
-    @Override
-    public Tablet getNextBatch() {
-        return null;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecOperator.java
deleted file mode 100644
index 883589b..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecOperator.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.TreeNode;
-
-/**
- * @author xingtanzjr
- * The base class of query executable operators, which is used to compose logical query plan.
- * TODO: consider how to restrict the children type for each type of ExecOperator
- */
-public abstract class ExecOperator<T> extends TreeNode<ExecOperator<?>> {
-
-    // Judge whether current operator has more result
-    public abstract boolean hasNext();
-
-    // Get next result batch of this operator
-    // Return null if there is no more result to return
-    public abstract T getNextBatch();
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillOperator.java
deleted file mode 100644
index 9390e59..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FillOperator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.FillPolicy;
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-
-/**
- * FillOperator is used to fill the empty field in one row.
- *
- * Children type: [All the operators whose result set is Tablet]
- */
-public class FillOperator extends ExecOperator<Tablet> {
-
-    // The policy to discard the result from upstream operator
-    private FillPolicy fillPolicy;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public Tablet getNextBatch() {
-        return null;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecOperator.java
deleted file mode 100644
index efe4609..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/FilterExecOperator.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
-
-/**
- * (We use FilterExecOperator to distinguish itself from the FilterOperator used in single-node IoTDB)
- * The FilterExecOperator is responsible to filter the RowRecord from Tablet.
- *
- * Children type: [All the operators whose result set is Tablet]
- */
-public class FilterExecOperator extends ExecOperator<Tablet> {
-
-    // The filter
-    private FilterOperator rowFilter;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public Tablet getNextBatch() {
-        return null;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelOperator.java
deleted file mode 100644
index 729c84f..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/GroupByLevelOperator.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
-import org.apache.iotdb.cluster.query.distribution.common.LevelBucketInfo;
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-
-/**
- * This operator is responsible for the final aggregation merge operation.
- * It will arrange the data by time range firstly. And inside each time range, the data from same measurement and
- * different devices will be rolled up by corresponding level into different buckets.
- * If the bucketInfo is empty, the data from `same measurement and different devices` won't be rolled up.
- * If the groupByTimeParameter is null, the data won't be split by time range.
- *
- * Children type: [SeriesAggregateOperator]
- */
-public class GroupByLevelOperator extends ExecOperator<Tablet> {
-
-    // All the buckets that the SeriesBatchAggInfo from upstream will be divided into.
-    private LevelBucketInfo bucketInfo;
-
-    // The parameter of `group by time`
-    // The GroupByLevelOperator also need GroupByTimeParameter
-    private GroupByTimeParameter groupByTimeParameter;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public Tablet getNextBatch() {
-        return null;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitOperator.java
deleted file mode 100644
index a48e664..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/LimitOperator.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-
-/**
- * LimitOperator is used to select top n result. It uses the default order of upstream operators
- *
- * Children type: [All the operators whose result set is Tablet]
- */
-public class LimitOperator extends ExecOperator<Tablet> {
-
-    // The limit count
-    private int limit;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public Tablet getNextBatch() {
-        return null;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetOperator.java
deleted file mode 100644
index 11cd7de..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/OffsetOperator.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-
-/**
- * OffsetOperator is used to skip top n result from upstream operators. It uses the default order of upstream operators
- *
- * Children type: [All the operators whose result set is Tablet]
- */
-public class OffsetOperator extends ExecOperator<Tablet> {
-
-    // The limit count
-    private int offset;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public Tablet getNextBatch() {
-        return null;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateOperator.java
deleted file mode 100644
index e566f49..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesAggregateOperator.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
-import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchAggInfo;
-
-/**
- * SeriesAggregateOperator is responsible to do the aggregation calculation for one series.
- * This operator will split data in one series into many groups by time range and do the aggregation calculation for each
- * group.
- * If there is no split parameter, it will return one result which is the aggregation result of all data in current series.
- *
- * Children type: [SeriesScanOperator]
- */
-public class SeriesAggregateOperator extends ExecOperator<SeriesBatchAggInfo> {
-
-    // The parameter of `group by time`
-    // Its value will be null if there is no `group by time` clause,
-    private GroupByTimeParameter groupByTimeParameter;
-
-    // TODO: need consider how to represent the aggregation function and corresponding implementation
-    // We use a String to indicate the parameter temporarily
-    private String aggregationFunc;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public SeriesBatchAggInfo getNextBatch() {
-        return null;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanOperator.java
deleted file mode 100644
index 46077bd..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SeriesScanOperator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-
-/**
- * SeriesScanOperator is responsible for read data and pre-aggregated statistic for a specific series.
- * When reading data, the SeriesScanOperator can read the raw data batch by batch. And also, it can leverage
- * the filter and other info to decrease the result set.
- * Besides, the SeriesScanOperator can read the pre-aggregated statistic in TsFile. And return the statistic with
- * a fix time range one by one. If the time range is narrower than the smallest pre-aggregated statistic or has overlap
- * with pre-aggregated statistic, the SeriesScanOperator will read the raw data and calculate the aggregation result for
- * specific time range.
- *
- * Children type: []
- */
-public class SeriesScanOperator extends ExecOperator<SeriesBatchData> {
-
-    // The path of the target series which will be scanned.
-    private Path seriesPath;
-
-    // The order to traverse the data.
-    // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
-    // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
-    private TraversalOrder scanOrder = TraversalOrder.TIMESTAMP_ASC;
-
-    // Filter data in current series.
-    private Filter filter;
-
-    // Limit for result set. The default value is -1, which means no limit
-    private int limit = -1;
-
-    // offset for result set. The default value is 0
-    private int offset;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public SeriesBatchData getNextBatch() {
-        return null;
-    }
-
-    // This method will only be invoked by SeriesAggregateOperator
-    // It will return the statistics of the series in given time range
-    // When calculate the statistics, the operator should use the most optimized way to do that. In other words, using
-    // raw data is the final way to do that.
-    public Statistics<?> getNextStatisticBetween(TimeRange timeRange) {
-        return null;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortOperator.java
deleted file mode 100644
index ab3f01d..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/SortOperator.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
-
-/**
- * In general, the parameter in sortOperator should be pushed down to the upstream operators.
- * In our optimized logical query plan, the sortOperator should not appear.
- */
-public class SortOperator extends ExecOperator<Tablet> {
-
-    private TraversalOrder sortOrder;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public Tablet getNextBatch() {
-        return null;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinOperator.java
deleted file mode 100644
index c573495..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/TimeJoinOperator.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
-import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
-
-/**
- * TimeJoinOperator is responsible for join two or more series.
- * The join algorithm is like outer join by timestamp column.
- * The output result of TimeJoinOperator is sorted by timestamp
- *
- * Children type: [SeriesScanOperator]
- */
-public class TimeJoinOperator extends ExecOperator<Tablet> {
-
-    // This parameter indicates the order when executing multiway merge sort.
-    private TraversalOrder mergeOrder;
-
-    // The policy to decide whether a row should be discarded
-    // The without policy is able to be push down to the TimeJoinOperator because we can know whether a row contains
-    // null or not in this operator the situation won't be changed by the downstream operators.
-    private WithoutPolicy withoutPolicy;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public Tablet getNextBatch() {
-        return null;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutOperator.java
deleted file mode 100644
index f25d435..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/WithoutOperator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
-
-import org.apache.iotdb.cluster.query.distribution.common.Tablet;
-import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
-
-/**
- * WithoutOperator is used to discard specific result from upstream operators.
- *
- * Children type: [All the operators whose result set is Tablet]
- */
-public class WithoutOperator extends ExecOperator<Tablet> {
-
-    // The policy to discard the result from upstream operator
-    private WithoutPolicy discardPolicy;
-
-    @Override
-    public boolean hasNext() {
-        return false;
-    }
-
-    @Override
-    public Tablet getNextBatch() {
-        return null;
-    }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java
new file mode 100644
index 0000000..b912994
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java
@@ -0,0 +1,43 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DeviceMergeOperator is responsible for constructing a device-based view of a set of series. And
+ * output the result with specific order. The order could be 'order by device' or 'order by
+ * timestamp'
+ *
+ * <p>The types of involved devices should be same. If the device contains n series, the
+ * device-based view will contain n+2 columns, which are timestamp column, device name column and n
+ * value columns of involved series.
+ *
+ * <p>Children type: [TimeJoinOperator]
+ */
+public class DeviceMergeOperator extends InternalOperator<Tablet> {
+  // The result output order that this operator
+  private TraversalOrder mergeOrder;
+
+  // Owned devices
+  private List<String> ownedDeviceNameList;
+
+  // The map from deviceName to corresponding query result operator responsible for that device.
+  private Map<String, TimeJoinOperator> upstreamMap;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  // If the Tablet from TimeJoinOperator has n columns, the output of DeviceMergeOperator will
+  // contain n+1 columns where
+  // the additional column is `deviceName`
+  // And, the `alignedByDevice` in the TabletMetadata will be `true`
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java
new file mode 100644
index 0000000..8ee610a
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java
@@ -0,0 +1,25 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.FillPolicy;
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+
+/**
+ * FillOperator is used to fill the empty field in one row.
+ *
+ * <p>Children type: [All the operators whose result set is Tablet]
+ */
+public class FillOperator extends InternalOperator<Tablet> {
+
+  // The policy to discard the result from upstream operator
+  private FillPolicy fillPolicy;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java
new file mode 100644
index 0000000..ecbda0f
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java
@@ -0,0 +1,26 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
+
+/**
+ * (We use FilterExecOperator to distinguish itself from the FilterOperator used in single-node
+ * IoTDB) The FilterExecOperator is responsible to filter the RowRecord from Tablet.
+ *
+ * <p>Children type: [All the operators whose result set is Tablet]
+ */
+public class FilterInternalOperator extends InternalOperator<Tablet> {
+
+  // The filter
+  private FilterOperator rowFilter;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java
new file mode 100644
index 0000000..c6e00d2
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java
@@ -0,0 +1,34 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
+import org.apache.iotdb.cluster.query.distribution.common.LevelBucketInfo;
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+
+/**
+ * This operator is responsible for the final aggregation merge operation. It will arrange the data
+ * by time range firstly. And inside each time range, the data from same measurement and different
+ * devices will be rolled up by corresponding level into different buckets. If the bucketInfo is
+ * empty, the data from `same measurement and different devices` won't be rolled up. If the
+ * groupByTimeParameter is null, the data won't be split by time range.
+ *
+ * <p>Children type: [SeriesAggregateOperator]
+ */
+public class GroupByLevelOperator extends InternalOperator<Tablet> {
+
+  // All the buckets that the SeriesBatchAggInfo from upstream will be divided into.
+  private LevelBucketInfo bucketInfo;
+
+  // The parameter of `group by time`
+  // The GroupByLevelOperator also need GroupByTimeParameter
+  private GroupByTimeParameter groupByTimeParameter;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java
new file mode 100644
index 0000000..0692381
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java
@@ -0,0 +1,17 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.TreeNode;
+
+/**
+ * @author xingtanzjr The base class of query executable operators, which is used to compose logical
+ *     query plan. TODO: consider how to restrict the children type for each type of ExecOperator
+ */
+public abstract class InternalOperator<T> extends TreeNode<InternalOperator<?>> {
+
+  // Judge whether current operator has more result
+  public abstract boolean hasNext();
+
+  // Get next result batch of this operator
+  // Return null if there is no more result to return
+  public abstract T getNextBatch();
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java
new file mode 100644
index 0000000..9075f64
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java
@@ -0,0 +1,24 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+
+/**
+ * LimitOperator is used to select top n result. It uses the default order of upstream operators
+ *
+ * <p>Children type: [All the operators whose result set is Tablet]
+ */
+public class LimitOperator extends InternalOperator<Tablet> {
+
+  // The limit count
+  private int limit;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java
new file mode 100644
index 0000000..c6f79ac
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java
@@ -0,0 +1,25 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+
+/**
+ * OffsetOperator is used to skip top n result from upstream operators. It uses the default order of
+ * upstream operators
+ *
+ * <p>Children type: [All the operators whose result set is Tablet]
+ */
+public class OffsetOperator extends InternalOperator<Tablet> {
+
+  // The limit count
+  private int offset;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java
new file mode 100644
index 0000000..96a5c86
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java
@@ -0,0 +1,33 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
+import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchAggInfo;
+
+/**
+ * SeriesAggregateOperator is responsible to do the aggregation calculation for one series. This
+ * operator will split data in one series into many groups by time range and do the aggregation
+ * calculation for each group. If there is no split parameter, it will return one result which is
+ * the aggregation result of all data in current series.
+ *
+ * <p>Children type: [SeriesScanOperator]
+ */
+public class SeriesAggregateOperator extends InternalOperator<SeriesBatchAggInfo> {
+
+  // The parameter of `group by time`
+  // Its value will be null if there is no `group by time` clause,
+  private GroupByTimeParameter groupByTimeParameter;
+
+  // TODO: need consider how to represent the aggregation function and corresponding implementation
+  // We use a String to indicate the parameter temporarily
+  private String aggregationFunc;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public SeriesBatchAggInfo getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java
new file mode 100644
index 0000000..fa83be7
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java
@@ -0,0 +1,23 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+
+/**
+ * In general, the parameter in sortOperator should be pushed down to the upstream operators. In our
+ * optimized logical query plan, the sortOperator should not appear.
+ */
+public class SortOperator extends InternalOperator<Tablet> {
+
+  private TraversalOrder sortOrder;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java
new file mode 100644
index 0000000..c4ae6f3
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java
@@ -0,0 +1,33 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
+
+/**
+ * TimeJoinOperator is responsible for join two or more series. The join algorithm is like outer
+ * join by timestamp column. The output result of TimeJoinOperator is sorted by timestamp
+ *
+ * <p>Children type: [SeriesScanOperator]
+ */
+public class TimeJoinOperator extends InternalOperator<Tablet> {
+
+  // This parameter indicates the order when executing multiway merge sort.
+  private TraversalOrder mergeOrder;
+
+  // The policy to decide whether a row should be discarded
+  // The without policy is able to be push down to the TimeJoinOperator because we can know whether
+  // a row contains
+  // null or not in this operator the situation won't be changed by the downstream operators.
+  private WithoutPolicy withoutPolicy;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java
new file mode 100644
index 0000000..535a3d6
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java
@@ -0,0 +1,25 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
+
+/**
+ * WithoutOperator is used to discard specific result from upstream operators.
+ *
+ * <p>Children type: [All the operators whose result set is Tablet]
+ */
+public class WithoutOperator extends InternalOperator<Tablet> {
+
+  // The policy to discard the result from upstream operator
+  private WithoutPolicy discardPolicy;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java
new file mode 100644
index 0000000..9cd6043
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java
@@ -0,0 +1,58 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
+import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+/**
+ * SeriesScanOperator is responsible for read data and pre-aggregated statistic for a specific
+ * series. When reading data, the SeriesScanOperator can read the raw data batch by batch. And also,
+ * it can leverage the filter and other info to decrease the result set. Besides, the
+ * SeriesScanOperator can read the pre-aggregated statistic in TsFile. And return the statistic with
+ * a fix time range one by one. If the time range is narrower than the smallest pre-aggregated
+ * statistic or has overlap with pre-aggregated statistic, the SeriesScanOperator will read the raw
+ * data and calculate the aggregation result for specific time range.
+ *
+ * <p>Children type: []
+ */
+public class SeriesScanOperator extends InternalOperator<SeriesBatchData> {
+
+  // The path of the target series which will be scanned.
+  private Path seriesPath;
+
+  // The order to traverse the data.
+  // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
+  // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
+  private TraversalOrder scanOrder = TraversalOrder.TIMESTAMP_ASC;
+
+  // Filter data in current series.
+  private Filter filter;
+
+  // Limit for result set. The default value is -1, which means no limit
+  private int limit = -1;
+
+  // offset for result set. The default value is 0
+  private int offset;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public SeriesBatchData getNextBatch() {
+    return null;
+  }
+
+  // This method will only be invoked by SeriesAggregateOperator
+  // It will return the statistics of the series in given time range
+  // When calculate the statistics, the operator should use the most optimized way to do that. In
+  // other words, using
+  // raw data is the final way to do that.
+  public Statistics<?> getNextStatisticBetween(TimeRange timeRange) {
+    return null;
+  }
+}

[iotdb] 02/02: design of source, sink and internal operators

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch suyurong/operator-design
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bdbd38eedbd43af3f764df308d4214eb4643c24d
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Mar 8 23:40:05 2022 +0800

    design of source, sink and internal operators
---
 .../distribution/operator/ExecutableOperator.java  | 34 ++++++++++++++++++
 .../operator/internal/DeviceMergeOperator.java     |  2 +-
 .../operator/internal/FillOperator.java            |  2 +-
 .../operator/internal/FilterInternalOperator.java  |  2 +-
 .../operator/internal/GroupByLevelOperator.java    |  2 +-
 .../operator/internal/InternalOperator.java        | 20 +++--------
 .../operator/internal/LimitOperator.java           |  2 +-
 .../operator/internal/OffsetOperator.java          |  2 +-
 .../operator/internal/SeriesAggregateOperator.java |  2 +-
 .../operator/internal/SortOperator.java            |  2 +-
 .../operator/internal/TimeJoinOperator.java        |  2 +-
 .../operator/internal/WithoutOperator.java         |  2 +-
 .../operator/sink/CsvSinkOperator.java             | 41 ++++++++++++++++++++++
 .../operator/sink/RestSinkOperator.java            | 41 ++++++++++++++++++++++
 .../distribution/operator/sink/SinkOperator.java   | 28 +++++++++++++++
 .../operator/sink/ThriftSinkOperator.java          | 41 ++++++++++++++++++++++
 .../operator/source/CsvSourceOperator.java         | 41 ++++++++++++++++++++++
 .../operator/source/SeriesScanOperator.java        | 10 ++++--
 .../operator/source/SourceOperator.java            | 28 +++++++++++++++
 19 files changed, 277 insertions(+), 27 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecutableOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecutableOperator.java
new file mode 100644
index 0000000..00a7a6a
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/ExecutableOperator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.TreeNode;
+
+public abstract class ExecutableOperator<T> extends TreeNode<ExecutableOperator<?>> {
+
+  // Resource control, runtime control...
+
+  // Judge whether current operator has more result
+  public abstract boolean hasNext();
+
+  // Get next result batch of this operator
+  // Return null if there is no more result to return
+  public abstract T getNextBatch();
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java
index b912994..d4bbced 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator.internal;
 
 import org.apache.iotdb.cluster.query.distribution.common.Tablet;
 import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java
index 8ee610a..595a27e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator.internal;
 
 import org.apache.iotdb.cluster.query.distribution.common.FillPolicy;
 import org.apache.iotdb.cluster.query.distribution.common.Tablet;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java
index ecbda0f..f41d768 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator.internal;
 
 import org.apache.iotdb.cluster.query.distribution.common.Tablet;
 import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java
index c6e00d2..9cab1c0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator.internal;
 
 import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
 import org.apache.iotdb.cluster.query.distribution.common.LevelBucketInfo;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java
index 0692381..a5b8b5b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java
@@ -1,17 +1,7 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator.internal;
 
-import org.apache.iotdb.cluster.query.distribution.common.TreeNode;
+import org.apache.iotdb.cluster.query.distribution.operator.ExecutableOperator;
 
-/**
- * @author xingtanzjr The base class of query executable operators, which is used to compose logical
- *     query plan. TODO: consider how to restrict the children type for each type of ExecOperator
- */
-public abstract class InternalOperator<T> extends TreeNode<InternalOperator<?>> {
-
-  // Judge whether current operator has more result
-  public abstract boolean hasNext();
-
-  // Get next result batch of this operator
-  // Return null if there is no more result to return
-  public abstract T getNextBatch();
-}
+// 从 buffer 拉数据
+// 推送到下游的逻辑
+public abstract class InternalOperator<T> extends ExecutableOperator<T> {}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java
index 9075f64..1330eee 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator.internal;
 
 import org.apache.iotdb.cluster.query.distribution.common.Tablet;
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java
index c6f79ac..aa82d8c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator.internal;
 
 import org.apache.iotdb.cluster.query.distribution.common.Tablet;
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java
index 96a5c86..889c50b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator.internal;
 
 import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
 import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchAggInfo;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java
index fa83be7..d2aee44 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator.internal;
 
 import org.apache.iotdb.cluster.query.distribution.common.Tablet;
 import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java
index c4ae6f3..bd7cf37 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator.internal;
 
 import org.apache.iotdb.cluster.query.distribution.common.Tablet;
 import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java
index 535a3d6..f678a2c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator.internal;
 
 import org.apache.iotdb.cluster.query.distribution.common.Tablet;
 import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/CsvSinkOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/CsvSinkOperator.java
new file mode 100644
index 0000000..17bba65
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/CsvSinkOperator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.query.distribution.operator.sink;
+
+import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
+
+public class CsvSinkOperator extends SinkOperator<SeriesBatchData> {
+
+  @Override
+  public void close() throws Exception {}
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public SeriesBatchData getNextBatch() {
+    return null;
+  }
+
+  @Override
+  public void open() throws Exception {}
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/RestSinkOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/RestSinkOperator.java
new file mode 100644
index 0000000..d8e6588
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/RestSinkOperator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.query.distribution.operator.sink;
+
+import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
+
+public class RestSinkOperator extends SinkOperator<SeriesBatchData> {
+
+  @Override
+  public void close() throws Exception {}
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public SeriesBatchData getNextBatch() {
+    return null;
+  }
+
+  @Override
+  public void open() throws Exception {}
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/SinkOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/SinkOperator.java
new file mode 100644
index 0000000..29c906d
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/SinkOperator.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.query.distribution.operator.sink;
+
+import org.apache.iotdb.cluster.query.distribution.operator.ExecutableOperator;
+
+// 构建与客户端的联系。
+public abstract class SinkOperator<T> extends ExecutableOperator<T> implements AutoCloseable {
+
+  public abstract void open() throws Exception;
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/ThriftSinkOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/ThriftSinkOperator.java
new file mode 100644
index 0000000..c9de077
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/sink/ThriftSinkOperator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.query.distribution.operator.sink;
+
+import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
+
+public class ThriftSinkOperator extends SinkOperator<SeriesBatchData> {
+
+  @Override
+  public void close() throws Exception {}
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public SeriesBatchData getNextBatch() {
+    return null;
+  }
+
+  @Override
+  public void open() throws Exception {}
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/CsvSourceOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/CsvSourceOperator.java
new file mode 100644
index 0000000..ff82128
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/CsvSourceOperator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.query.distribution.operator.source;
+
+import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
+
+public class CsvSourceOperator extends SourceOperator<SeriesBatchData> {
+
+  @Override
+  public void close() throws Exception {}
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public SeriesBatchData getNextBatch() {
+    return null;
+  }
+
+  @Override
+  public void open() throws Exception {}
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java
index 9cd6043..fd67f9e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.distribution.operator;
+package org.apache.iotdb.cluster.query.distribution.operator.source;
 
 import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
 import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
@@ -18,7 +18,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
  *
  * <p>Children type: []
  */
-public class SeriesScanOperator extends InternalOperator<SeriesBatchData> {
+public class SeriesScanOperator extends SourceOperator<SeriesBatchData> {
 
   // The path of the target series which will be scanned.
   private Path seriesPath;
@@ -55,4 +55,10 @@ public class SeriesScanOperator extends InternalOperator<SeriesBatchData> {
   public Statistics<?> getNextStatisticBetween(TimeRange timeRange) {
     return null;
   }
+
+  @Override
+  public void close() throws Exception {}
+
+  @Override
+  public void open() throws Exception {}
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SourceOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SourceOperator.java
new file mode 100644
index 0000000..304cafe
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SourceOperator.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.query.distribution.operator.source;
+
+import org.apache.iotdb.cluster.query.distribution.operator.ExecutableOperator;
+
+// 区分不同的数据源,可拓展。
+public abstract class SourceOperator<T> extends ExecutableOperator<T> implements AutoCloseable {
+
+  public abstract void open() throws Exception;
+}