You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/09 12:43:22 UTC
[iotdb] 01/03: add aggregate Operator
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch aggregationOp
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1938ed555873cfa02d1a324ba8d734e2e1899b15
Author: Alima777 <wx...@gmail.com>
AuthorDate: Mon May 9 11:37:46 2022 +0800
add aggregate Operator
---
.../iotdb/db/mpp/aggregation/Aggregator.java | 1 +
.../operator/process/AggregateOperator.java | 91 ++++++++++++++++++++--
...Operator.java => RawDataAggregateOperator.java} | 42 +++++++++-
.../source/SeriesAggregateScanOperator.java | 41 ++++------
4 files changed, 139 insertions(+), 36 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
index a0a7b87f1b..f083f65fe9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
@@ -105,6 +105,7 @@ public class Aggregator {
}
public void reset() {
+ timeRange = new TimeRange(0, Long.MAX_VALUE);
accumulator.reset();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java
index 71e7817b94..9926c46496 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java
@@ -21,23 +21,60 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import static org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregateScanOperator.initTimeRangeIterator;
+
+/**
+ * AggregateOperator can process the situation: aggregation of intermediate aggregate result, it
+ * will output one result based on time interval too. One intermediate tsBlock input will only
+ * contain the result of one time interval exactly.
+ */
public class AggregateOperator implements ProcessOperator {
private final OperatorContext operatorContext;
private final List<Aggregator> aggregators;
private final List<Operator> children;
+ private final int inputOperatorsCount;
+ private final TsBlock[] inputTsBlocks;
+ private final TsBlockBuilder tsBlockBuilder;
+
+ private ITimeRangeIterator timeRangeIterator;
+ // current interval of aggregation window [curStartTime, curEndTime)
+ private TimeRange curTimeRange;
+
public AggregateOperator(
- OperatorContext operatorContext, List<Aggregator> aggregators, List<Operator> children) {
+ OperatorContext operatorContext,
+ List<Aggregator> aggregators,
+ List<Operator> children,
+ boolean ascending,
+ GroupByTimeParameter groupByTimeParameter) {
this.operatorContext = operatorContext;
this.aggregators = aggregators;
this.children = children;
+
+ this.inputOperatorsCount = children.size();
+ this.inputTsBlocks = new TsBlock[inputOperatorsCount];
+ List<TSDataType> dataTypes = new ArrayList<>();
+ for (Aggregator aggregator : aggregators) {
+ dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
+ }
+ tsBlockBuilder = new TsBlockBuilder(dataTypes);
+ this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending);
}
@Override
@@ -47,26 +84,68 @@ public class AggregateOperator implements ProcessOperator {
@Override
public ListenableFuture<Void> isBlocked() {
- return ProcessOperator.super.isBlocked();
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ ListenableFuture<Void> blocked = children.get(i).isBlocked();
+ if (!blocked.isDone()) {
+ return blocked;
+ }
+ }
+ return NOT_BLOCKED;
}
@Override
public TsBlock next() {
- return null;
+ // update input tsBlock
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ inputTsBlocks[i] = children.get(i).next();
+ }
+ // consume current input tsBlocks
+ for (Aggregator aggregator : aggregators) {
+ aggregator.reset();
+ aggregator.processTsBlocks(inputTsBlocks);
+ }
+ // output result from aggregator
+ return updateResultTsBlockFromAggregators(tsBlockBuilder, aggregators, curTimeRange);
}
@Override
public boolean hasNext() {
- return false;
+ if (!timeRangeIterator.hasNextTimeRange()) {
+ return false;
+ }
+ curTimeRange = timeRangeIterator.nextTimeRange();
+ return true;
}
@Override
public void close() throws Exception {
- ProcessOperator.super.close();
+ for (Operator child : children) {
+ child.close();
+ }
}
@Override
public boolean isFinished() {
- return false;
+ return !this.hasNext();
+ }
+
+ public static TsBlock updateResultTsBlockFromAggregators(
+ TsBlockBuilder tsBlockBuilder, List<Aggregator> aggregators, TimeRange curTimeRange) {
+ tsBlockBuilder.reset();
+ TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
+ // Use start time of current time range as time column
+ timeColumnBuilder.writeLong(curTimeRange.getMin());
+ ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
+ int columnIndex = 0;
+ for (Aggregator aggregator : aggregators) {
+ ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length];
+ columnBuilder[0] = columnBuilders[columnIndex++];
+ if (columnBuilder.length > 1) {
+ columnBuilder[1] = columnBuilders[columnIndex++];
+ }
+ aggregator.outputResult(columnBuilder);
+ }
+ tsBlockBuilder.declarePosition();
+ return tsBlockBuilder.build();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregateOperator.java
similarity index 52%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregateOperator.java
index 71e7817b94..5fb705dc3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregateOperator.java
@@ -16,28 +16,64 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
-public class AggregateOperator implements ProcessOperator {
+import static org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregateScanOperator.initTimeRangeIterator;
+
+/**
+ * RawDataAggregateOperator is used to process raw data tsBlock input calculating using value
+ * filter. It's possible that there is more than one tsBlock input in one time interval. And it's
+ * also possible that one tsBlock can cover multiple time intervals too.
+ */
+public class RawDataAggregateOperator implements ProcessOperator {
private final OperatorContext operatorContext;
private final List<Aggregator> aggregators;
private final List<Operator> children;
- public AggregateOperator(
- OperatorContext operatorContext, List<Aggregator> aggregators, List<Operator> children) {
+ private final int inputOperatorsCount;
+ private final TsBlock[] inputTsBlocks;
+ private final TsBlockBuilder tsBlockBuilder;
+
+ private ITimeRangeIterator timeRangeIterator;
+ // current interval of aggregation window [curStartTime, curEndTime)
+ private TimeRange curTimeRange;
+
+ public RawDataAggregateOperator(
+ OperatorContext operatorContext,
+ List<Aggregator> aggregators,
+ List<Operator> children,
+ boolean ascending,
+ GroupByTimeParameter groupByTimeParameter) {
this.operatorContext = operatorContext;
this.aggregators = aggregators;
this.children = children;
+
+ this.inputOperatorsCount = children.size();
+ this.inputTsBlocks = new TsBlock[inputOperatorsCount];
+ List<TSDataType> dataTypes = new ArrayList<>();
+ for (Aggregator aggregator : aggregators) {
+ dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
+ }
+ tsBlockBuilder = new TsBlockBuilder(dataTypes);
+ this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
index fdc50a808b..f86b5e5deb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.AggregateOperator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator;
@@ -33,8 +34,6 @@ import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import com.google.common.util.concurrent.ListenableFuture;
@@ -101,7 +100,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
}
tsBlockBuilder = new TsBlockBuilder(dataTypes);
- this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter);
+ this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending);
}
/**
@@ -109,7 +108,8 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
* Aggregation query has only one time window and the result set of it does not contain a
* timestamp, so it doesn't matter what the time range returns.
*/
- public ITimeRangeIterator initTimeRangeIterator(GroupByTimeParameter groupByTimeParameter) {
+ public static ITimeRangeIterator initTimeRangeIterator(
+ GroupByTimeParameter groupByTimeParameter, boolean ascending) {
if (groupByTimeParameter == null) {
return new SingleTimeWindowIterator(0, Long.MAX_VALUE);
} else {
@@ -164,19 +164,19 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
// 2. Calculate aggregation result based on current time window
if (calcFromCacheData(curTimeRange)) {
- updateResultTsBlockUsingAggregateResult();
+ updateResultTsBlockFromAggregators();
return true;
}
// read page data firstly
if (readAndCalcFromPage(curTimeRange)) {
- updateResultTsBlockUsingAggregateResult();
+ updateResultTsBlockFromAggregators();
return true;
}
// read chunk data secondly
if (readAndCalcFromChunk(curTimeRange)) {
- updateResultTsBlockUsingAggregateResult();
+ updateResultTsBlockFromAggregators();
return true;
}
@@ -185,7 +185,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
Statistics fileStatistics = seriesScanUtil.currentFileStatistics();
if (fileStatistics.getStartTime() >= curTimeRange.getMax()) {
if (ascending) {
- updateResultTsBlockUsingAggregateResult();
+ updateResultTsBlockFromAggregators();
return true;
} else {
seriesScanUtil.skipCurrentFile();
@@ -202,35 +202,22 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
// read chunk
if (readAndCalcFromChunk(curTimeRange)) {
- updateResultTsBlockUsingAggregateResult();
+ updateResultTsBlockFromAggregators();
return true;
}
}
- updateResultTsBlockUsingAggregateResult();
+ updateResultTsBlockFromAggregators();
return true;
} catch (IOException e) {
throw new RuntimeException("Error while scanning the file", e);
}
}
- private void updateResultTsBlockUsingAggregateResult() {
- tsBlockBuilder.reset();
- TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
- // Use start time of current time range as time column
- timeColumnBuilder.writeLong(curTimeRange.getMin());
- ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
- int columnIndex = 0;
- for (Aggregator aggregator : aggregators) {
- ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length];
- columnBuilder[0] = columnBuilders[columnIndex++];
- if (columnBuilder.length > 1) {
- columnBuilder[1] = columnBuilders[columnIndex++];
- }
- aggregator.outputResult(columnBuilder);
- }
- tsBlockBuilder.declarePosition();
- resultTsBlock = tsBlockBuilder.build();
+ private void updateResultTsBlockFromAggregators() {
+ resultTsBlock =
+ AggregateOperator.updateResultTsBlockFromAggregators(
+ tsBlockBuilder, aggregators, curTimeRange);
hasCachedTsBlock = true;
}