You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/09 12:43:22 UTC

[iotdb] 01/03: add aggregate Operator

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch aggregationOp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1938ed555873cfa02d1a324ba8d734e2e1899b15
Author: Alima777 <wx...@gmail.com>
AuthorDate: Mon May 9 11:37:46 2022 +0800

    add aggregate Operator
---
 .../iotdb/db/mpp/aggregation/Aggregator.java       |  1 +
 .../operator/process/AggregateOperator.java        | 91 ++++++++++++++++++++--
 ...Operator.java => RawDataAggregateOperator.java} | 42 +++++++++-
 .../source/SeriesAggregateScanOperator.java        | 41 ++++------
 4 files changed, 139 insertions(+), 36 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
index a0a7b87f1b..f083f65fe9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
@@ -105,6 +105,7 @@ public class Aggregator {
   }
 
   public void reset() {
+    timeRange = new TimeRange(0, Long.MAX_VALUE);
     accumulator.reset();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java
index 71e7817b94..9926c46496 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java
@@ -21,23 +21,60 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
+import static org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregateScanOperator.initTimeRangeIterator;
+
+/**
+ * AggregateOperator can process the situation: aggregation of intermediate aggregate result, it
+ * will output one result based on time interval too. One intermediate tsBlock input will only
+ * contain the result of one time interval exactly.
+ */
 public class AggregateOperator implements ProcessOperator {
 
   private final OperatorContext operatorContext;
   private final List<Aggregator> aggregators;
   private final List<Operator> children;
 
+  private final int inputOperatorsCount;
+  private final TsBlock[] inputTsBlocks;
+  private final TsBlockBuilder tsBlockBuilder;
+
+  private ITimeRangeIterator timeRangeIterator;
+  // current interval of aggregation window [curStartTime, curEndTime)
+  private TimeRange curTimeRange;
+
   public AggregateOperator(
-      OperatorContext operatorContext, List<Aggregator> aggregators, List<Operator> children) {
+      OperatorContext operatorContext,
+      List<Aggregator> aggregators,
+      List<Operator> children,
+      boolean ascending,
+      GroupByTimeParameter groupByTimeParameter) {
     this.operatorContext = operatorContext;
     this.aggregators = aggregators;
     this.children = children;
+
+    this.inputOperatorsCount = children.size();
+    this.inputTsBlocks = new TsBlock[inputOperatorsCount];
+    List<TSDataType> dataTypes = new ArrayList<>();
+    for (Aggregator aggregator : aggregators) {
+      dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
+    }
+    tsBlockBuilder = new TsBlockBuilder(dataTypes);
+    this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending);
   }
 
   @Override
@@ -47,26 +84,68 @@ public class AggregateOperator implements ProcessOperator {
 
   @Override
   public ListenableFuture<Void> isBlocked() {
-    return ProcessOperator.super.isBlocked();
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      ListenableFuture<Void> blocked = children.get(i).isBlocked();
+      if (!blocked.isDone()) {
+        return blocked;
+      }
+    }
+    return NOT_BLOCKED;
   }
 
   @Override
   public TsBlock next() {
-    return null;
+    // update input tsBlock
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      inputTsBlocks[i] = children.get(i).next();
+    }
+    // consume current input tsBlocks
+    for (Aggregator aggregator : aggregators) {
+      aggregator.reset();
+      aggregator.processTsBlocks(inputTsBlocks);
+    }
+    // output result from aggregator
+    return updateResultTsBlockFromAggregators(tsBlockBuilder, aggregators, curTimeRange);
   }
 
   @Override
   public boolean hasNext() {
-    return false;
+    if (!timeRangeIterator.hasNextTimeRange()) {
+      return false;
+    }
+    curTimeRange = timeRangeIterator.nextTimeRange();
+    return true;
   }
 
   @Override
   public void close() throws Exception {
-    ProcessOperator.super.close();
+    for (Operator child : children) {
+      child.close();
+    }
   }
 
   @Override
   public boolean isFinished() {
-    return false;
+    return !this.hasNext();
+  }
+
+  public static TsBlock updateResultTsBlockFromAggregators(
+      TsBlockBuilder tsBlockBuilder, List<Aggregator> aggregators, TimeRange curTimeRange) {
+    tsBlockBuilder.reset();
+    TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
+    // Use start time of current time range as time column
+    timeColumnBuilder.writeLong(curTimeRange.getMin());
+    ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
+    int columnIndex = 0;
+    for (Aggregator aggregator : aggregators) {
+      ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length];
+      columnBuilder[0] = columnBuilders[columnIndex++];
+      if (columnBuilder.length > 1) {
+        columnBuilder[1] = columnBuilders[columnIndex++];
+      }
+      aggregator.outputResult(columnBuilder);
+    }
+    tsBlockBuilder.declarePosition();
+    return tsBlockBuilder.build();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregateOperator.java
similarity index 52%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregateOperator.java
index 71e7817b94..5fb705dc3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregateOperator.java
@@ -16,28 +16,64 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
-public class AggregateOperator implements ProcessOperator {
+import static org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregateScanOperator.initTimeRangeIterator;
+
+/**
+ * RawDataAggregateOperator is used to process raw data tsBlock input calculating using value
+ * filter. It's possible that there is more than one tsBlock input in one time interval. And it's
+ * also possible that one tsBlock can cover multiple time intervals too.
+ */
+public class RawDataAggregateOperator implements ProcessOperator {
 
   private final OperatorContext operatorContext;
   private final List<Aggregator> aggregators;
   private final List<Operator> children;
 
-  public AggregateOperator(
-      OperatorContext operatorContext, List<Aggregator> aggregators, List<Operator> children) {
+  private final int inputOperatorsCount;
+  private final TsBlock[] inputTsBlocks;
+  private final TsBlockBuilder tsBlockBuilder;
+
+  private ITimeRangeIterator timeRangeIterator;
+  // current interval of aggregation window [curStartTime, curEndTime)
+  private TimeRange curTimeRange;
+
+  public RawDataAggregateOperator(
+      OperatorContext operatorContext,
+      List<Aggregator> aggregators,
+      List<Operator> children,
+      boolean ascending,
+      GroupByTimeParameter groupByTimeParameter) {
     this.operatorContext = operatorContext;
     this.aggregators = aggregators;
     this.children = children;
+
+    this.inputOperatorsCount = children.size();
+    this.inputTsBlocks = new TsBlock[inputOperatorsCount];
+    List<TSDataType> dataTypes = new ArrayList<>();
+    for (Aggregator aggregator : aggregators) {
+      dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
+    }
+    tsBlockBuilder = new TsBlockBuilder(dataTypes);
+    this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
index fdc50a808b..f86b5e5deb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.AggregateOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator;
@@ -33,8 +34,6 @@ import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import com.google.common.util.concurrent.ListenableFuture;
@@ -101,7 +100,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
       dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
     }
     tsBlockBuilder = new TsBlockBuilder(dataTypes);
-    this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter);
+    this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending);
   }
 
   /**
@@ -109,7 +108,8 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
    * Aggregation query has only one time window and the result set of it does not contain a
    * timestamp, so it doesn't matter what the time range returns.
    */
-  public ITimeRangeIterator initTimeRangeIterator(GroupByTimeParameter groupByTimeParameter) {
+  public static ITimeRangeIterator initTimeRangeIterator(
+      GroupByTimeParameter groupByTimeParameter, boolean ascending) {
     if (groupByTimeParameter == null) {
       return new SingleTimeWindowIterator(0, Long.MAX_VALUE);
     } else {
@@ -164,19 +164,19 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
 
       // 2. Calculate aggregation result based on current time window
       if (calcFromCacheData(curTimeRange)) {
-        updateResultTsBlockUsingAggregateResult();
+        updateResultTsBlockFromAggregators();
         return true;
       }
 
       // read page data firstly
       if (readAndCalcFromPage(curTimeRange)) {
-        updateResultTsBlockUsingAggregateResult();
+        updateResultTsBlockFromAggregators();
         return true;
       }
 
       // read chunk data secondly
       if (readAndCalcFromChunk(curTimeRange)) {
-        updateResultTsBlockUsingAggregateResult();
+        updateResultTsBlockFromAggregators();
         return true;
       }
 
@@ -185,7 +185,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
         Statistics fileStatistics = seriesScanUtil.currentFileStatistics();
         if (fileStatistics.getStartTime() >= curTimeRange.getMax()) {
           if (ascending) {
-            updateResultTsBlockUsingAggregateResult();
+            updateResultTsBlockFromAggregators();
             return true;
           } else {
             seriesScanUtil.skipCurrentFile();
@@ -202,35 +202,22 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
 
         // read chunk
         if (readAndCalcFromChunk(curTimeRange)) {
-          updateResultTsBlockUsingAggregateResult();
+          updateResultTsBlockFromAggregators();
           return true;
         }
       }
 
-      updateResultTsBlockUsingAggregateResult();
+      updateResultTsBlockFromAggregators();
       return true;
     } catch (IOException e) {
       throw new RuntimeException("Error while scanning the file", e);
     }
   }
 
-  private void updateResultTsBlockUsingAggregateResult() {
-    tsBlockBuilder.reset();
-    TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
-    // Use start time of current time range as time column
-    timeColumnBuilder.writeLong(curTimeRange.getMin());
-    ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
-    int columnIndex = 0;
-    for (Aggregator aggregator : aggregators) {
-      ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length];
-      columnBuilder[0] = columnBuilders[columnIndex++];
-      if (columnBuilder.length > 1) {
-        columnBuilder[1] = columnBuilders[columnIndex++];
-      }
-      aggregator.outputResult(columnBuilder);
-    }
-    tsBlockBuilder.declarePosition();
-    resultTsBlock = tsBlockBuilder.build();
+  private void updateResultTsBlockFromAggregators() {
+    resultTsBlock =
+        AggregateOperator.updateResultTsBlockFromAggregators(
+            tsBlockBuilder, aggregators, curTimeRange);
     hasCachedTsBlock = true;
   }