You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/03/14 05:53:39 UTC

[iotdb] 07/11: design of source, sink and internal operators

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/mpp-query-basis
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f02e74eed0091af5d8a5b380c15a8537260baf02
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Mar 8 23:39:41 2022 +0800

    design of source, sink and internal operators
---
 .../operator/internal/DeviceMergeOperator.java     | 43 ++++++++++++++++
 .../operator/internal/FillOperator.java            | 25 ++++++++++
 .../operator/internal/FilterInternalOperator.java  | 26 ++++++++++
 .../operator/internal/GroupByLevelOperator.java    | 34 +++++++++++++
 .../operator/internal/InternalOperator.java        | 17 +++++++
 .../operator/internal/LimitOperator.java           | 24 +++++++++
 .../operator/internal/OffsetOperator.java          | 25 ++++++++++
 .../operator/internal/SeriesAggregateOperator.java | 33 ++++++++++++
 .../operator/internal/SortOperator.java            | 23 +++++++++
 .../operator/internal/TimeJoinOperator.java        | 33 ++++++++++++
 .../operator/internal/WithoutOperator.java         | 25 ++++++++++
 .../operator/source/SeriesScanOperator.java        | 58 ++++++++++++++++++++++
 12 files changed, 366 insertions(+)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java
new file mode 100644
index 0000000..b912994
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java
@@ -0,0 +1,43 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DeviceMergeOperator is responsible for constructing a device-based view of a set of series. And
+ * output the result with specific order. The order could be 'order by device' or 'order by
+ * timestamp'
+ *
+ * <p>The types of involved devices should be same. If the device contains n series, the
+ * device-based view will contain n+2 columns, which are timestamp column, device name column and n
+ * value columns of involved series.
+ *
+ * <p>Children type: [TimeJoinOperator]
+ */
+public class DeviceMergeOperator extends InternalOperator<Tablet> {
+  // The result output order that this operator
+  private TraversalOrder mergeOrder;
+
+  // Owned devices
+  private List<String> ownedDeviceNameList;
+
+  // The map from deviceName to corresponding query result operator responsible for that device.
+  private Map<String, TimeJoinOperator> upstreamMap;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  // If the Tablet from TimeJoinOperator has n columns, the output of DeviceMergeOperator will
+  // contain n+1 columns where
+  // the additional column is `deviceName`
+  // And, the `alignedByDevice` in the TabletMetadata will be `true`
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java
new file mode 100644
index 0000000..8ee610a
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java
@@ -0,0 +1,25 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.FillPolicy;
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+
+/**
+ * FillOperator is used to fill the empty field in one row.
+ *
+ * <p>Children type: [All the operators whose result set is Tablet]
+ */
+public class FillOperator extends InternalOperator<Tablet> {
+
+  // The policy to discard the result from upstream operator
+  private FillPolicy fillPolicy;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java
new file mode 100644
index 0000000..ecbda0f
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java
@@ -0,0 +1,26 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
+
+/**
+ * (We use FilterExecOperator to distinguish itself from the FilterOperator used in single-node
+ * IoTDB) The FilterExecOperator is responsible to filter the RowRecord from Tablet.
+ *
+ * <p>Children type: [All the operators whose result set is Tablet]
+ */
+public class FilterInternalOperator extends InternalOperator<Tablet> {
+
+  // The filter
+  private FilterOperator rowFilter;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java
new file mode 100644
index 0000000..c6e00d2
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java
@@ -0,0 +1,34 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
+import org.apache.iotdb.cluster.query.distribution.common.LevelBucketInfo;
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+
+/**
+ * This operator is responsible for the final aggregation merge operation. It will arrange the data
+ * by time range firstly. And inside each time range, the data from same measurement and different
+ * devices will be rolled up by corresponding level into different buckets. If the bucketInfo is
+ * empty, the data from `same measurement and different devices` won't be rolled up. If the
+ * groupByTimeParameter is null, the data won't be split by time range.
+ *
+ * <p>Children type: [SeriesAggregateOperator]
+ */
+public class GroupByLevelOperator extends InternalOperator<Tablet> {
+
+  // All the buckets that the SeriesBatchAggInfo from upstream will be divided into.
+  private LevelBucketInfo bucketInfo;
+
+  // The parameter of `group by time`
+  // The GroupByLevelOperator also need GroupByTimeParameter
+  private GroupByTimeParameter groupByTimeParameter;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java
new file mode 100644
index 0000000..0692381
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java
@@ -0,0 +1,17 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.TreeNode;
+
+/**
+ * @author xingtanzjr The base class of query executable operators, which is used to compose logical
+ *     query plan. TODO: consider how to restrict the children type for each type of ExecOperator
+ */
+public abstract class InternalOperator<T> extends TreeNode<InternalOperator<?>> {
+
+  // Judge whether current operator has more result
+  public abstract boolean hasNext();
+
+  // Get next result batch of this operator
+  // Return null if there is no more result to return
+  public abstract T getNextBatch();
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java
new file mode 100644
index 0000000..9075f64
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java
@@ -0,0 +1,24 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+
+/**
+ * LimitOperator is used to select top n result. It uses the default order of upstream operators
+ *
+ * <p>Children type: [All the operators whose result set is Tablet]
+ */
+public class LimitOperator extends InternalOperator<Tablet> {
+
+  // The limit count
+  private int limit;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java
new file mode 100644
index 0000000..c6f79ac
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java
@@ -0,0 +1,25 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+
+/**
+ * OffsetOperator is used to skip top n result from upstream operators. It uses the default order of
+ * upstream operators
+ *
+ * <p>Children type: [All the operators whose result set is Tablet]
+ */
+public class OffsetOperator extends InternalOperator<Tablet> {
+
+  // The limit count
+  private int offset;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java
new file mode 100644
index 0000000..96a5c86
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java
@@ -0,0 +1,33 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter;
+import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchAggInfo;
+
+/**
+ * SeriesAggregateOperator is responsible to do the aggregation calculation for one series. This
+ * operator will split data in one series into many groups by time range and do the aggregation
+ * calculation for each group. If there is no split parameter, it will return one result which is
+ * the aggregation result of all data in current series.
+ *
+ * <p>Children type: [SeriesScanOperator]
+ */
+public class SeriesAggregateOperator extends InternalOperator<SeriesBatchAggInfo> {
+
+  // The parameter of `group by time`
+  // Its value will be null if there is no `group by time` clause,
+  private GroupByTimeParameter groupByTimeParameter;
+
+  // TODO: need consider how to represent the aggregation function and corresponding implementation
+  // We use a String to indicate the parameter temporarily
+  private String aggregationFunc;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public SeriesBatchAggInfo getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java
new file mode 100644
index 0000000..fa83be7
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java
@@ -0,0 +1,23 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+
+/**
+ * In general, the parameter in sortOperator should be pushed down to the upstream operators. In our
+ * optimized logical query plan, the sortOperator should not appear.
+ */
+public class SortOperator extends InternalOperator<Tablet> {
+
+  private TraversalOrder sortOrder;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java
new file mode 100644
index 0000000..c4ae6f3
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java
@@ -0,0 +1,33 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
+
+/**
+ * TimeJoinOperator is responsible for join two or more series. The join algorithm is like outer
+ * join by timestamp column. The output result of TimeJoinOperator is sorted by timestamp
+ *
+ * <p>Children type: [SeriesScanOperator]
+ */
+public class TimeJoinOperator extends InternalOperator<Tablet> {
+
+  // This parameter indicates the order when executing multiway merge sort.
+  private TraversalOrder mergeOrder;
+
+  // The policy to decide whether a row should be discarded
+  // The without policy is able to be push down to the TimeJoinOperator because we can know whether
+  // a row contains
+  // null or not in this operator the situation won't be changed by the downstream operators.
+  private WithoutPolicy withoutPolicy;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java
new file mode 100644
index 0000000..535a3d6
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java
@@ -0,0 +1,25 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.Tablet;
+import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy;
+
+/**
+ * WithoutOperator is used to discard specific result from upstream operators.
+ *
+ * <p>Children type: [All the operators whose result set is Tablet]
+ */
+public class WithoutOperator extends InternalOperator<Tablet> {
+
+  // The policy to discard the result from upstream operator
+  private WithoutPolicy discardPolicy;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public Tablet getNextBatch() {
+    return null;
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java
new file mode 100644
index 0000000..9cd6043
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java
@@ -0,0 +1,58 @@
+package org.apache.iotdb.cluster.query.distribution.operator;
+
+import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData;
+import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+/**
+ * SeriesScanOperator is responsible for read data and pre-aggregated statistic for a specific
+ * series. When reading data, the SeriesScanOperator can read the raw data batch by batch. And also,
+ * it can leverage the filter and other info to decrease the result set. Besides, the
+ * SeriesScanOperator can read the pre-aggregated statistic in TsFile. And return the statistic with
+ * a fix time range one by one. If the time range is narrower than the smallest pre-aggregated
+ * statistic or has overlap with pre-aggregated statistic, the SeriesScanOperator will read the raw
+ * data and calculate the aggregation result for specific time range.
+ *
+ * <p>Children type: []
+ */
+public class SeriesScanOperator extends InternalOperator<SeriesBatchData> {
+
+  // The path of the target series which will be scanned.
+  private Path seriesPath;
+
+  // The order to traverse the data.
+  // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
+  // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
+  private TraversalOrder scanOrder = TraversalOrder.TIMESTAMP_ASC;
+
+  // Filter data in current series.
+  private Filter filter;
+
+  // Limit for result set. The default value is -1, which means no limit
+  private int limit = -1;
+
+  // offset for result set. The default value is 0
+  private int offset;
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public SeriesBatchData getNextBatch() {
+    return null;
+  }
+
+  // This method will only be invoked by SeriesAggregateOperator
+  // It will return the statistics of the series in given time range
+  // When calculate the statistics, the operator should use the most optimized way to do that. In
+  // other words, using
+  // raw data is the final way to do that.
+  public Statistics<?> getNextStatisticBetween(TimeRange timeRange) {
+    return null;
+  }
+}