You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/03 08:39:25 UTC
[iotdb] branch master updated: [IOTDB-2844] Implementation of Aggregator and Accumulator (#5757)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c6f9d0a085 [IOTDB-2844] Implementation of Aggregator and Accumulator (#5757)
c6f9d0a085 is described below
commit c6f9d0a0854cdd83fa07decf2960245d972cbf46
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Tue May 3 16:39:21 2022 +0800
[IOTDB-2844] Implementation of Aggregator and Accumulator (#5757)
---
.../iotdb/db/mpp/aggregation/Accumulator.java | 70 +++++
.../db/mpp/aggregation/AccumulatorFactory.java | 71 +++++
.../iotdb/db/mpp/aggregation/Aggregator.java | 122 +++++++
.../iotdb/db/mpp/aggregation/AvgAccumulator.java | 180 +++++++++++
.../iotdb/db/mpp/aggregation/CountAccumulator.java | 101 ++++++
.../db/mpp/aggregation/ExtremeAccumulator.java | 296 +++++++++++++++++
.../db/mpp/aggregation/FirstValueAccumulator.java | 319 +++++++++++++++++++
.../mpp/aggregation/FirstValueDescAccumulator.java | 91 ++++++
.../db/mpp/aggregation/LastValueAccumulator.java | 305 ++++++++++++++++++
.../mpp/aggregation/LastValueDescAccumulator.java | 135 ++++++++
.../db/mpp/aggregation/MaxTimeAccumulator.java | 101 ++++++
.../db/mpp/aggregation/MaxTimeDescAccumulator.java | 57 ++++
.../db/mpp/aggregation/MaxValueAccumulator.java | 270 ++++++++++++++++
.../db/mpp/aggregation/MinTimeAccumulator.java | 105 +++++++
.../MinTimeDescAccumulator.java} | 40 +--
.../db/mpp/aggregation/MinValueAccumulator.java | 270 ++++++++++++++++
.../iotdb/db/mpp/aggregation/SumAccumulator.java | 172 ++++++++++
.../operator/process/AggregateOperator.java | 17 +-
.../operator/process/DeviceViewOperator.java | 2 +-
.../source/SeriesAggregateScanOperator.java | 121 ++++---
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 2 +-
.../plan/parameter/AggregationDescriptor.java | 4 +
.../plan/planner/plan/parameter/InputLocation.java | 4 +-
.../iotdb/db/mpp/aggregation/AccumulatorTest.java | 349 +++++++++++++++++++++
.../operator/SeriesAggregateScanOperatorTest.java | 153 +++++++--
.../iotdb/tsfile/read/common/block/TsBlock.java | 38 ++-
.../read/common/block/column/BinaryColumn.java | 9 +
.../read/common/block/column/BooleanColumn.java | 9 +
.../tsfile/read/common/block/column/Column.java | 6 +
.../read/common/block/column/DoubleColumn.java | 9 +
.../read/common/block/column/FloatColumn.java | 8 +
.../tsfile/read/common/block/column/IntColumn.java | 8 +
.../read/common/block/column/LongColumn.java | 8 +
.../block/column/RunLengthEncodedColumn.java | 8 +
.../read/common/block/column/TimeColumn.java | 8 +
.../apache/iotdb/tsfile/utils/TsPrimitiveType.java | 146 ++++++++-
.../iotdb/tsfile/common/block/TsBlockTest.java | 49 +++
.../iotdb/tsfile/read/common/ColumnTest.java | 322 +++++++++++++++++++
38 files changed, 3850 insertions(+), 135 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Accumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Accumulator.java
new file mode 100644
index 0000000000..40c8a08d50
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Accumulator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+public interface Accumulator {
+
+ /** Column should be like: | Time | Value | */
+ void addInput(Column[] column, TimeRange timeRange);
+
+ /**
+ * For aggregation function like COUNT, SUM, partialResult should be single; But for AVG,
+ * last_value, it should be double column with dictionary order.
+ */
+ void addIntermediate(Column[] partialResult);
+
+ /**
+ * This method can only be used in seriesAggregateScanOperator, it will use different statistics
+ * based on the type of Accumulator.
+ */
+ void addStatistics(Statistics statistics);
+
+ /**
+ * Attention: setFinal should be invoked only once, and addInput() and addIntermediate() are not
+ * allowed again.
+ */
+ void setFinal(Column finalResult);
+
+ /**
+ * For aggregation function like COUNT, SUM, partialResult should be single, so its output column
+ * is single too; But for AVG, last_value, it should be double column with dictionary order.
+ */
+ void outputIntermediate(ColumnBuilder[] tsBlockBuilder);
+
+ /** Final result is single column for any aggregation function. */
+ void outputFinal(ColumnBuilder tsBlockBuilder);
+
+ void reset();
+
+ /**
+ * This method can only be used in seriesAggregateScanOperator. For first_value or last_value in
+ * decreasing order, we can get final result by the first record.
+ */
+ boolean hasFinalResult();
+
+ TSDataType[] getIntermediateType();
+
+ TSDataType getFinalType();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AccumulatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AccumulatorFactory.java
new file mode 100644
index 0000000000..ed4a416d9a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AccumulatorFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class AccumulatorFactory {
+
+ // TODO: Are we going to create different seriesScanOperator based on order by sequence?
+ public static Accumulator createAccumulator(
+ AggregationType aggregationType, TSDataType tsDataType, boolean ascending) {
+ switch (aggregationType) {
+ case COUNT:
+ return new CountAccumulator();
+ case AVG:
+ return new AvgAccumulator(tsDataType);
+ case SUM:
+ return new SumAccumulator(tsDataType);
+ case EXTREME:
+ return new ExtremeAccumulator(tsDataType);
+ case MAX_TIME:
+ return ascending ? new MaxTimeAccumulator() : new MaxTimeDescAccumulator();
+ case MIN_TIME:
+ return ascending ? new MinTimeAccumulator() : new MinTimeDescAccumulator();
+ case MAX_VALUE:
+ return new MaxValueAccumulator(tsDataType);
+ case MIN_VALUE:
+ return new MinValueAccumulator(tsDataType);
+ case LAST_VALUE:
+ return ascending
+ ? new LastValueAccumulator(tsDataType)
+ : new LastValueDescAccumulator(tsDataType);
+ case FIRST_VALUE:
+ return ascending
+ ? new FirstValueAccumulator(tsDataType)
+ : new FirstValueDescAccumulator(tsDataType);
+ default:
+ throw new IllegalArgumentException("Invalid Aggregation function: " + aggregationType);
+ }
+ }
+
+ public static List<Accumulator> createAccumulators(
+ List<AggregationType> aggregationTypes, TSDataType tsDataType, boolean ascending) {
+ List<Accumulator> accumulators = new ArrayList<>();
+ for (AggregationType aggregationType : aggregationTypes) {
+ accumulators.add(createAccumulator(aggregationType, tsDataType, ascending));
+ }
+ return accumulators;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
new file mode 100644
index 0000000000..a0a7b87f1b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class Aggregator {
+
+ private final Accumulator accumulator;
+ // In some intermediate result input, inputLocation[] should include two columns
+ private List<InputLocation[]> inputLocationList;
+ private final AggregationStep step;
+
+ private TimeRange timeRange = new TimeRange(0, Long.MAX_VALUE);
+
+ // Used for SeriesAggregateScanOperator
+ public Aggregator(Accumulator accumulator, AggregationStep step) {
+ this.accumulator = accumulator;
+ this.step = step;
+ }
+
+ // Used for aggregateOperator
+ public Aggregator(
+ Accumulator accumulator, AggregationStep step, List<InputLocation[]> inputLocationList) {
+ this.accumulator = accumulator;
+ this.step = step;
+ this.inputLocationList = inputLocationList;
+ }
+
+ // Used for SeriesAggregateScanOperator
+ public void processTsBlock(TsBlock tsBlock) {
+ checkArgument(
+ step.isInputRaw(), "Step in SeriesAggregateScanOperator can only process raw input");
+ // TODO Aligned TimeSeries
+ accumulator.addInput(tsBlock.getTimeAndValueColumn(0), timeRange);
+ }
+
+ // Used for aggregateOperator
+ public void processTsBlocks(TsBlock[] tsBlock) {
+ for (InputLocation[] inputLocations : inputLocationList) {
+ if (step.isInputRaw()) {
+ TsBlock rawTsBlock = tsBlock[inputLocations[0].getTsBlockIndex()];
+ Column[] timeValueColumn = new Column[2];
+ timeValueColumn[0] = rawTsBlock.getTimeColumn();
+ timeValueColumn[1] = rawTsBlock.getColumn(inputLocations[0].getValueColumnIndex());
+ accumulator.addInput(timeValueColumn, timeRange);
+ } else {
+ Column[] columns = new Column[inputLocations.length];
+ for (int i = 0; i < inputLocations.length; i++) {
+ columns[i] =
+ tsBlock[inputLocations[i].getTsBlockIndex()].getColumn(
+ inputLocations[i].getValueColumnIndex());
+ }
+ accumulator.addIntermediate(columns);
+ }
+ }
+ }
+
+ public void outputResult(ColumnBuilder[] columnBuilder) {
+ if (step.isOutputPartial()) {
+ accumulator.outputIntermediate(columnBuilder);
+ } else {
+ accumulator.outputFinal(columnBuilder[0]);
+ }
+ }
+
+ public void processStatistics(Statistics statistics) {
+ accumulator.addStatistics(statistics);
+ }
+
+ public TSDataType[] getOutputType() {
+ if (step.isOutputPartial()) {
+ return accumulator.getIntermediateType();
+ } else {
+ return new TSDataType[] {accumulator.getFinalType()};
+ }
+ }
+
+ public void reset() {
+ accumulator.reset();
+ }
+
+ public boolean hasFinalResult() {
+ return accumulator.hasFinalResult();
+ }
+
+ public void setTimeRange(TimeRange timeRange) {
+ this.timeRange = timeRange;
+ }
+
+ public TimeRange getTimeRange() {
+ return timeRange;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java
new file mode 100644
index 0000000000..3c726f9603
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class AvgAccumulator implements Accumulator {
+
+ private TSDataType seriesDataType;
+ private long countValue;
+ private double sumValue;
+
+ public AvgAccumulator(TSDataType seriesDataType) {
+ this.seriesDataType = seriesDataType;
+ }
+
+ @Override
+ public void addInput(Column[] column, TimeRange timeRange) {
+ switch (seriesDataType) {
+ case INT32:
+ addIntInput(column, timeRange);
+ break;
+ case INT64:
+ addLongInput(column, timeRange);
+ break;
+ case FLOAT:
+ addFloatInput(column, timeRange);
+ break;
+ case DOUBLE:
+ addDoubleInput(column, timeRange);
+ break;
+ case TEXT:
+ case BOOLEAN:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in aggregation AVG : %s", seriesDataType));
+ }
+ }
+
+ // partialResult should be like: | countValue1 | sumValue1 |
+ @Override
+ public void addIntermediate(Column[] partialResult) {
+ checkArgument(partialResult.length == 2, "partialResult of Avg should be 2");
+ countValue += partialResult[0].getLong(0);
+ sumValue += partialResult[1].getDouble(0);
+ }
+
+ @Override
+ public void addStatistics(Statistics statistics) {
+ countValue += statistics.getCount();
+ if (statistics instanceof IntegerStatistics) {
+ sumValue += statistics.getSumLongValue();
+ } else {
+ sumValue += statistics.getSumDoubleValue();
+ }
+ }
+
+ // Set sumValue to finalResult and keep countValue equals to 1
+ @Override
+ public void setFinal(Column finalResult) {
+ reset();
+ countValue = 1;
+ sumValue = finalResult.getDouble(0);
+ }
+
+ @Override
+ public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+ checkArgument(columnBuilders.length == 2, "partialResult of Avg should be 2");
+ columnBuilders[0].writeLong(countValue);
+ columnBuilders[1].writeDouble(sumValue);
+ }
+
+ @Override
+ public void outputFinal(ColumnBuilder columnBuilder) {
+ columnBuilder.writeDouble(sumValue / countValue);
+ }
+
+ @Override
+ public void reset() {
+ this.countValue = 0;
+ this.sumValue = 0.0;
+ }
+
+ @Override
+ public boolean hasFinalResult() {
+ return false;
+ }
+
+ @Override
+ public TSDataType[] getIntermediateType() {
+ return new TSDataType[] {TSDataType.INT64, TSDataType.DOUBLE};
+ }
+
+ @Override
+ public TSDataType getFinalType() {
+ return TSDataType.DOUBLE;
+ }
+
+ private void addIntInput(Column[] column, TimeRange timeRange) {
+ TimeColumn timeColumn = (TimeColumn) column[0];
+ for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+ long curTime = timeColumn.getLong(i);
+ if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+ break;
+ }
+ if (!column[1].isNull(i)) {
+ countValue++;
+ sumValue += column[1].getInt(i);
+ }
+ }
+ }
+
+ private void addLongInput(Column[] column, TimeRange timeRange) {
+ TimeColumn timeColumn = (TimeColumn) column[0];
+ for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+ long curTime = timeColumn.getLong(i);
+ if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+ break;
+ }
+ if (!column[1].isNull(i)) {
+ countValue++;
+ sumValue += column[1].getLong(i);
+ }
+ }
+ }
+
+ private void addFloatInput(Column[] column, TimeRange timeRange) {
+ TimeColumn timeColumn = (TimeColumn) column[0];
+ for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+ long curTime = timeColumn.getLong(i);
+ if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+ break;
+ }
+ if (!column[1].isNull(i)) {
+ countValue++;
+ sumValue += column[1].getFloat(i);
+ }
+ }
+ }
+
+ private void addDoubleInput(Column[] column, TimeRange timeRange) {
+ TimeColumn timeColumn = (TimeColumn) column[0];
+ for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+ long curTime = timeColumn.getLong(i);
+ if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+ break;
+ }
+ if (!column[1].isNull(i)) {
+ countValue++;
+ sumValue += column[1].getDouble(i);
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java
new file mode 100644
index 0000000000..c2825b00bb
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class CountAccumulator implements Accumulator {
+
+ private long countValue = 0;
+
+ public CountAccumulator() {}
+
+ // Column should be like: | Time | Value |
+ @Override
+ public void addInput(Column[] column, TimeRange timeRange) {
+ TimeColumn timeColumn = (TimeColumn) column[0];
+ for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+ long curTime = timeColumn.getLong(i);
+ if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+ break;
+ }
+ if (!column[1].isNull(i)) {
+ countValue++;
+ }
+ }
+ }
+
+ // partialResult should be like: | partialCountValue1 |
+ @Override
+ public void addIntermediate(Column[] partialResult) {
+ checkArgument(partialResult.length == 1, "partialResult of Count should be 1");
+ countValue += partialResult[0].getLong(0);
+ }
+
+ @Override
+ public void addStatistics(Statistics statistics) {
+ countValue += statistics.getCount();
+ }
+
+ // finalResult should be single column, like: | finalCountValue |
+ @Override
+ public void setFinal(Column finalResult) {
+ countValue = finalResult.getLong(0);
+ }
+
+ // columnBuilder should be single in countAccumulator
+ @Override
+ public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+ checkArgument(columnBuilders.length == 1, "partialResult of Count should be 1");
+ columnBuilders[0].writeLong(countValue);
+ }
+
+ @Override
+ public void outputFinal(ColumnBuilder columnBuilder) {
+ columnBuilder.writeLong(countValue);
+ }
+
+ @Override
+ public void reset() {
+ this.countValue = 0;
+ }
+
+ @Override
+ public boolean hasFinalResult() {
+ return false;
+ }
+
+ @Override
+ public TSDataType[] getIntermediateType() {
+ return new TSDataType[] {TSDataType.INT64};
+ }
+
+ @Override
+ public TSDataType getFinalType() {
+ return TSDataType.INT64;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java
new file mode 100644
index 0000000000..e70a7f484a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class ExtremeAccumulator implements Accumulator {
+
+ private final TSDataType seriesDataType;
+ private TsPrimitiveType extremeResult;
+ private boolean initResult;
+
+ public ExtremeAccumulator(TSDataType seriesDataType) {
+ this.seriesDataType = seriesDataType;
+ this.extremeResult = TsPrimitiveType.getByType(seriesDataType);
+ }
+
+ @Override
+ public void addInput(Column[] column, TimeRange timeRange) {
+ switch (seriesDataType) {
+ case INT32:
+ addIntInput(column, timeRange);
+ break;
+ case INT64:
+ addLongInput(column, timeRange);
+ break;
+ case FLOAT:
+ addFloatInput(column, timeRange);
+ break;
+ case DOUBLE:
+ addDoubleInput(column, timeRange);
+ break;
+ case TEXT:
+ case BOOLEAN:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in Extreme: %s", seriesDataType));
+ }
+ }
+
+ // partialResult should be like: | PartialExtremeValue |
+ @Override
+ public void addIntermediate(Column[] partialResult) {
+ checkArgument(partialResult.length == 1, "partialResult of ExtremeValue should be 1");
+ switch (seriesDataType) {
+ case INT32:
+ updateIntResult(partialResult[0].getInt(0));
+ break;
+ case INT64:
+ updateLongResult(partialResult[0].getLong(0));
+ break;
+ case FLOAT:
+ updateFloatResult(partialResult[0].getFloat(0));
+ break;
+ case DOUBLE:
+ updateDoubleResult(partialResult[0].getDouble(0));
+ break;
+ case TEXT:
+ case BOOLEAN:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in Extreme: %s", seriesDataType));
+ }
+ }
+
+ @Override
+ public void addStatistics(Statistics statistics) {
+ switch (seriesDataType) {
+ case INT32:
+ updateIntResult((int) statistics.getMaxValue());
+ updateIntResult((int) statistics.getMinValue());
+ break;
+ case INT64:
+ updateLongResult((long) statistics.getMaxValue());
+ updateLongResult((long) statistics.getMinValue());
+ break;
+ case FLOAT:
+ updateFloatResult((float) statistics.getMaxValue());
+ updateFloatResult((float) statistics.getMinValue());
+ break;
+ case DOUBLE:
+ updateDoubleResult((double) statistics.getMaxValue());
+ updateDoubleResult((double) statistics.getMinValue());
+ break;
+ case TEXT:
+ case BOOLEAN:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in Extreme: %s", seriesDataType));
+ }
+ }
+
+ @Override
+ public void setFinal(Column finalResult) {
+ extremeResult.setObject(finalResult.getObject(0));
+ }
+
+ // columnBuilder should be single in ExtremeAccumulator
+ @Override
+ public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+ checkArgument(columnBuilders.length == 1, "partialResult of ExtremeValue should be 1");
+ switch (seriesDataType) {
+ case INT32:
+ columnBuilders[0].writeInt(extremeResult.getInt());
+ break;
+ case INT64:
+ columnBuilders[0].writeLong(extremeResult.getLong());
+ break;
+ case FLOAT:
+ columnBuilders[0].writeFloat(extremeResult.getFloat());
+ break;
+ case DOUBLE:
+ columnBuilders[0].writeDouble(extremeResult.getDouble());
+ break;
+ case TEXT:
+ case BOOLEAN:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in Extreme: %s", seriesDataType));
+ }
+ }
+
+ @Override
+ public void outputFinal(ColumnBuilder columnBuilder) {
+ switch (seriesDataType) {
+ case INT32:
+ columnBuilder.writeInt(extremeResult.getInt());
+ break;
+ case INT64:
+ columnBuilder.writeLong(extremeResult.getLong());
+ break;
+ case FLOAT:
+ columnBuilder.writeFloat(extremeResult.getFloat());
+ break;
+ case DOUBLE:
+ columnBuilder.writeDouble(extremeResult.getDouble());
+ break;
+ case TEXT:
+ case BOOLEAN:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in Extreme: %s", seriesDataType));
+ }
+ }
+
+ @Override
+ public void reset() {
+ initResult = false;
+ extremeResult.reset();
+ }
+
+ @Override
+ public boolean hasFinalResult() {
+ return false;
+ }
+
+ @Override
+ public TSDataType[] getIntermediateType() {
+ return new TSDataType[] {extremeResult.getDataType()};
+ }
+
+ @Override
+ public TSDataType getFinalType() {
+ return extremeResult.getDataType();
+ }
+
+ private void addIntInput(Column[] column, TimeRange timeRange) {
+ TimeColumn timeColumn = (TimeColumn) column[0];
+ for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+ long curTime = timeColumn.getLong(i);
+ if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+ break;
+ }
+ if (!column[1].isNull(i)) {
+ updateIntResult(column[1].getInt(i));
+ }
+ }
+ }
+
+ private void updateIntResult(int extVal) {
+ int absExtVal = Math.abs(extVal);
+ int candidateResult = extremeResult.getInt();
+ int absCandidateResult = Math.abs(extremeResult.getInt());
+
+ if (!initResult
+ || (absExtVal > absCandidateResult)
+ || (absExtVal == absCandidateResult) && extVal > candidateResult) {
+ initResult = true;
+ extremeResult.setInt(extVal);
+ }
+ }
+
+ private void addLongInput(Column[] column, TimeRange timeRange) {
+ TimeColumn timeColumn = (TimeColumn) column[0];
+ for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+ long curTime = timeColumn.getLong(i);
+ if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+ break;
+ }
+ if (!column[1].isNull(i)) {
+ updateLongResult(column[1].getLong(i));
+ }
+ }
+ }
+
+ private void updateLongResult(long extVal) {
+ long absExtVal = Math.abs(extVal);
+ long candidateResult = extremeResult.getLong();
+ long absCandidateResult = Math.abs(extremeResult.getLong());
+
+ if (!initResult
+ || (absExtVal > absCandidateResult)
+ || (absExtVal == absCandidateResult) && extVal > candidateResult) {
+ initResult = true;
+ extremeResult.setLong(extVal);
+ }
+ }
+
+ private void addFloatInput(Column[] column, TimeRange timeRange) {
+ TimeColumn timeColumn = (TimeColumn) column[0];
+ for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+ long curTime = timeColumn.getLong(i);
+ if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+ break;
+ }
+ if (!column[1].isNull(i)) {
+ updateFloatResult(column[1].getFloat(i));
+ }
+ }
+ }
+
+ private void updateFloatResult(float extVal) {
+ float absExtVal = Math.abs(extVal);
+ float candidateResult = extremeResult.getFloat();
+ float absCandidateResult = Math.abs(extremeResult.getFloat());
+
+ if (!initResult
+ || (absExtVal > absCandidateResult)
+ || (absExtVal == absCandidateResult) && extVal > candidateResult) {
+ initResult = true;
+ extremeResult.setFloat(extVal);
+ }
+ }
+
+ private void addDoubleInput(Column[] column, TimeRange timeRange) {
+ TimeColumn timeColumn = (TimeColumn) column[0];
+ for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+ long curTime = timeColumn.getLong(i);
+ if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+ break;
+ }
+ if (!column[1].isNull(i)) {
+ updateDoubleResult(column[1].getDouble(i));
+ }
+ }
+ }
+
+ private void updateDoubleResult(double extVal) {
+ double absExtVal = Math.abs(extVal);
+ double candidateResult = extremeResult.getDouble();
+ double absCandidateResult = Math.abs(extremeResult.getDouble());
+
+ if (!initResult
+ || (absExtVal > absCandidateResult)
+ || (absExtVal == absCandidateResult) && extVal > candidateResult) {
+ initResult = true;
+ extremeResult.setDouble(extVal);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
new file mode 100644
index 0000000000..45bce52036
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class FirstValueAccumulator implements Accumulator {
+
+ protected final TSDataType seriesDataType;
+ protected boolean hasCandidateResult;
+ protected TsPrimitiveType firstValue;
+ protected long minTime = Long.MAX_VALUE;
+
+ public FirstValueAccumulator(TSDataType seriesDataType) {
+ this.seriesDataType = seriesDataType;
+ firstValue = TsPrimitiveType.getByType(seriesDataType);
+ }
+
+ // Column should be like: | Time | Value |
+ @Override
+ public void addInput(Column[] column, TimeRange timeRange) {
+ switch (seriesDataType) {
+ case INT32:
+ addIntInput(column, timeRange);
+ break;
+ case INT64:
+ addLongInput(column, timeRange);
+ break;
+ case FLOAT:
+ addFloatInput(column, timeRange);
+ break;
+ case DOUBLE:
+ addDoubleInput(column, timeRange);
+ break;
+ case TEXT:
+ addBinaryInput(column, timeRange);
+ break;
+ case BOOLEAN:
+ addBooleanInput(column, timeRange);
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in FirstValue: %s", seriesDataType));
+ }
+ }
+
+ // partialResult should be like: | FirstValue | MinTime |
+ @Override
+ public void addIntermediate(Column[] partialResult) {
+ checkArgument(partialResult.length == 2, "partialResult of FirstValue should be 2");
+ switch (seriesDataType) {
+ case INT32:
+ updateIntFirstValue(partialResult[0].getInt(0), partialResult[1].getLong(0));
+ break;
+ case INT64:
+ updateLongFirstValue(partialResult[0].getLong(0), partialResult[1].getLong(0));
+ break;
+ case FLOAT:
+ updateFloatFirstValue(partialResult[0].getFloat(0), partialResult[1].getLong(0));
+ break;
+ case DOUBLE:
+ updateDoubleFirstValue(partialResult[0].getDouble(0), partialResult[1].getLong(0));
+ break;
+ case TEXT:
+ updateBinaryFirstValue(partialResult[0].getBinary(0), partialResult[1].getLong(0));
+ break;
+ case BOOLEAN:
+ updateBooleanFirstValue(partialResult[0].getBoolean(0), partialResult[1].getLong(0));
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in FirstValue: %s", seriesDataType));
+ }
+ }
+
+ @Override
+ public void addStatistics(Statistics statistics) {
+ switch (seriesDataType) {
+ case INT32:
+ updateIntFirstValue((int) statistics.getFirstValue(), statistics.getStartTime());
+ break;
+ case INT64:
+ updateLongFirstValue((long) statistics.getFirstValue(), statistics.getStartTime());
+ break;
+ case FLOAT:
+ updateFloatFirstValue((float) statistics.getFirstValue(), statistics.getStartTime());
+ break;
+ case DOUBLE:
+ updateDoubleFirstValue((double) statistics.getFirstValue(), statistics.getStartTime());
+ break;
+ case TEXT:
+ updateBinaryFirstValue((Binary) statistics.getFirstValue(), statistics.getStartTime());
+ case BOOLEAN:
+ updateBooleanFirstValue((boolean) statistics.getFirstValue(), statistics.getStartTime());
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in FirstValue: %s", seriesDataType));
+ }
+ }
+
+ // finalResult should be single column, like: | finalFirstValue |
+ @Override
+ public void setFinal(Column finalResult) {
+ reset();
+ firstValue.setObject(finalResult.getObject(0));
+ }
+
+ // columnBuilder should be double in FirstValueAccumulator
+ @Override
+ public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+ checkArgument(columnBuilders.length == 2, "partialResult of FirstValue should be 2");
+ switch (seriesDataType) {
+ case INT32:
+ columnBuilders[0].writeInt(firstValue.getInt());
+ break;
+ case INT64:
+ columnBuilders[0].writeLong(firstValue.getLong());
+ break;
+ case FLOAT:
+ columnBuilders[0].writeFloat(firstValue.getFloat());
+ break;
+ case DOUBLE:
+ columnBuilders[0].writeDouble(firstValue.getDouble());
+ break;
+ case TEXT:
+ columnBuilders[0].writeBinary(firstValue.getBinary());
+ break;
+ case BOOLEAN:
+ columnBuilders[0].writeBoolean(firstValue.getBoolean());
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in Extreme: %s", seriesDataType));
+ }
+ columnBuilders[1].writeLong(minTime);
+ }
+
+ @Override
+ public void outputFinal(ColumnBuilder columnBuilder) {
+ switch (seriesDataType) {
+ case INT32:
+ columnBuilder.writeInt(firstValue.getInt());
+ break;
+ case INT64:
+ columnBuilder.writeLong(firstValue.getLong());
+ break;
+ case FLOAT:
+ columnBuilder.writeFloat(firstValue.getFloat());
+ break;
+ case DOUBLE:
+ columnBuilder.writeDouble(firstValue.getDouble());
+ break;
+ case TEXT:
+ columnBuilder.writeBinary(firstValue.getBinary());
+ break;
+ case BOOLEAN:
+ columnBuilder.writeBoolean(firstValue.getBoolean());
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in Extreme: %s", seriesDataType));
+ }
+ }
+
+ @Override
+ public void reset() {
+ hasCandidateResult = false;
+ this.minTime = Long.MAX_VALUE;
+ this.firstValue.reset();
+ }
+
+ @Override
+ public boolean hasFinalResult() {
+ return hasCandidateResult;
+ }
+
+ @Override
+ public TSDataType[] getIntermediateType() {
+ return new TSDataType[] {firstValue.getDataType(), TSDataType.INT64};
+ }
+
+ @Override
+ public TSDataType getFinalType() {
+ return firstValue.getDataType();
+ }
+
+ protected void addIntInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+ updateIntFirstValue(column[1].getInt(i), curTime);
+ break;
+ }
+ }
+ }
+
+ protected void updateIntFirstValue(int value, long curTime) {
+ hasCandidateResult = true;
+ if (curTime < minTime) {
+ minTime = curTime;
+ firstValue.setInt(value);
+ }
+ }
+
+ protected void addLongInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+ updateLongFirstValue(column[1].getLong(i), curTime);
+ break;
+ }
+ }
+ }
+
+ protected void updateLongFirstValue(long value, long curTime) {
+ hasCandidateResult = true;
+ if (curTime < minTime) {
+ minTime = curTime;
+ firstValue.setLong(value);
+ }
+ }
+
+ protected void addFloatInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+ updateFloatFirstValue(column[1].getFloat(i), curTime);
+ break;
+ }
+ }
+ }
+
+ protected void updateFloatFirstValue(float value, long curTime) {
+ hasCandidateResult = true;
+ if (curTime < minTime) {
+ minTime = curTime;
+ firstValue.setFloat(value);
+ }
+ }
+
+ protected void addDoubleInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+ updateDoubleFirstValue(column[1].getDouble(i), curTime);
+ break;
+ }
+ }
+ }
+
+ protected void updateDoubleFirstValue(double value, long curTime) {
+ hasCandidateResult = true;
+ if (curTime < minTime) {
+ minTime = curTime;
+ firstValue.setDouble(value);
+ }
+ }
+
+ protected void addBooleanInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+ updateBooleanFirstValue(column[1].getBoolean(i), curTime);
+ break;
+ }
+ }
+ }
+
+ protected void updateBooleanFirstValue(boolean value, long curTime) {
+ hasCandidateResult = true;
+ if (curTime < minTime) {
+ minTime = curTime;
+ firstValue.setBoolean(value);
+ }
+ }
+
+ protected void addBinaryInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+ updateBinaryFirstValue(column[1].getBinary(i), curTime);
+ break;
+ }
+ }
+ }
+
+ protected void updateBinaryFirstValue(Binary value, long curTime) {
+ hasCandidateResult = true;
+ if (curTime < minTime) {
+ minTime = curTime;
+ firstValue.setBinary(value);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueDescAccumulator.java
new file mode 100644
index 0000000000..08c04b8185
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueDescAccumulator.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public class FirstValueDescAccumulator extends FirstValueAccumulator {
+
+ public FirstValueDescAccumulator(TSDataType seriesDataType) {
+ super(seriesDataType);
+ }
+
+ @Override
+ public boolean hasFinalResult() {
+ return false;
+ }
+
+ // Don't break in advance
+ protected void addIntInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+ updateIntFirstValue(column[1].getInt(i), curTime);
+ }
+ }
+ }
+
+ protected void addLongInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+ updateLongFirstValue(column[1].getLong(i), curTime);
+ }
+ }
+ }
+
+ protected void addFloatInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+ updateFloatFirstValue(column[1].getFloat(i), curTime);
+ }
+ }
+ }
+
+ protected void addDoubleInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+ updateDoubleFirstValue(column[1].getDouble(i), curTime);
+ }
+ }
+ }
+
+ protected void addBooleanInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+ updateBooleanFirstValue(column[1].getBoolean(i), curTime);
+ }
+ }
+ }
+
+ protected void addBinaryInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+ updateBinaryFirstValue(column[1].getBinary(i), curTime);
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
new file mode 100644
index 0000000000..db418a490e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class LastValueAccumulator implements Accumulator {
+
+ protected final TSDataType seriesDataType;
+ protected TsPrimitiveType lastValue;
+ protected long maxTime = Long.MIN_VALUE;
+
+ public LastValueAccumulator(TSDataType seriesDataType) {
+ this.seriesDataType = seriesDataType;
+ lastValue = TsPrimitiveType.getByType(seriesDataType);
+ }
+
+ // Column should be like: | Time | Value |
+ @Override
+ public void addInput(Column[] column, TimeRange timeRange) {
+ switch (seriesDataType) {
+ case INT32:
+ addIntInput(column, timeRange);
+ break;
+ case INT64:
+ addLongInput(column, timeRange);
+ break;
+ case FLOAT:
+ addFloatInput(column, timeRange);
+ break;
+ case DOUBLE:
+ addDoubleInput(column, timeRange);
+ break;
+ case TEXT:
+ addBinaryInput(column, timeRange);
+ break;
+ case BOOLEAN:
+ addBooleanInput(column, timeRange);
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in LastValue: %s", seriesDataType));
+ }
+ }
+
+ // partialResult should be like: | LastValue | MaxTime |
+ @Override
+ public void addIntermediate(Column[] partialResult) {
+ checkArgument(partialResult.length == 2, "partialResult of LastValue should be 2");
+ switch (seriesDataType) {
+ case INT32:
+ updateIntLastValue(partialResult[0].getInt(0), partialResult[1].getLong(0));
+ break;
+ case INT64:
+ updateLongLastValue(partialResult[0].getLong(0), partialResult[1].getLong(0));
+ break;
+ case FLOAT:
+ updateFloatLastValue(partialResult[0].getFloat(0), partialResult[1].getLong(0));
+ break;
+ case DOUBLE:
+ updateDoubleLastValue(partialResult[0].getDouble(0), partialResult[1].getLong(0));
+ break;
+ case TEXT:
+ updateBinaryLastValue(partialResult[0].getBinary(0), partialResult[1].getLong(0));
+ break;
+ case BOOLEAN:
+ updateBooleanLastValue(partialResult[0].getBoolean(0), partialResult[1].getLong(0));
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in LastValue: %s", seriesDataType));
+ }
+ }
+
+ @Override
+ public void addStatistics(Statistics statistics) {
+ switch (seriesDataType) {
+ case INT32:
+ updateIntLastValue((int) statistics.getLastValue(), statistics.getEndTime());
+ break;
+ case INT64:
+ updateLongLastValue((long) statistics.getLastValue(), statistics.getEndTime());
+ break;
+ case FLOAT:
+ updateFloatLastValue((float) statistics.getLastValue(), statistics.getEndTime());
+ break;
+ case DOUBLE:
+ updateDoubleLastValue((double) statistics.getLastValue(), statistics.getEndTime());
+ break;
+ case TEXT:
+ updateBinaryLastValue((Binary) statistics.getLastValue(), statistics.getEndTime());
+ case BOOLEAN:
+ updateBooleanLastValue((boolean) statistics.getLastValue(), statistics.getEndTime());
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in LastValue: %s", seriesDataType));
+ }
+ }
+
+ // finalResult should be single column, like: | finalLastValue |
+ @Override
+ public void setFinal(Column finalResult) {
+ reset();
+ lastValue.setObject(finalResult.getObject(0));
+ }
+
+ // columnBuilder should be double in LastValueAccumulator
+ @Override
+ public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+ checkArgument(columnBuilders.length == 2, "partialResult of LastValue should be 2");
+ switch (seriesDataType) {
+ case INT32:
+ columnBuilders[0].writeInt(lastValue.getInt());
+ break;
+ case INT64:
+ columnBuilders[0].writeLong(lastValue.getLong());
+ break;
+ case FLOAT:
+ columnBuilders[0].writeFloat(lastValue.getFloat());
+ break;
+ case DOUBLE:
+ columnBuilders[0].writeDouble(lastValue.getDouble());
+ break;
+ case TEXT:
+ columnBuilders[0].writeBinary(lastValue.getBinary());
+ break;
+ case BOOLEAN:
+ columnBuilders[0].writeBoolean(lastValue.getBoolean());
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in Extreme: %s", seriesDataType));
+ }
+ columnBuilders[1].writeLong(maxTime);
+ }
+
+ @Override
+ public void outputFinal(ColumnBuilder columnBuilder) {
+ switch (seriesDataType) {
+ case INT32:
+ columnBuilder.writeInt(lastValue.getInt());
+ break;
+ case INT64:
+ columnBuilder.writeLong(lastValue.getLong());
+ break;
+ case FLOAT:
+ columnBuilder.writeFloat(lastValue.getFloat());
+ break;
+ case DOUBLE:
+ columnBuilder.writeDouble(lastValue.getDouble());
+ break;
+ case TEXT:
+ columnBuilder.writeBinary(lastValue.getBinary());
+ break;
+ case BOOLEAN:
+ columnBuilder.writeBoolean(lastValue.getBoolean());
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in Extreme: %s", seriesDataType));
+ }
+ }
+
+ @Override
+ public void reset() {
+ this.maxTime = Long.MIN_VALUE;
+ this.lastValue.reset();
+ }
+
+ @Override
+ public boolean hasFinalResult() {
+ return false;
+ }
+
+ @Override
+ public TSDataType[] getIntermediateType() {
+ return new TSDataType[] {lastValue.getDataType(), TSDataType.INT64};
+ }
+
+ @Override
+ public TSDataType getFinalType() {
+ return lastValue.getDataType();
+ }
+
+ protected void addIntInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+ updateIntLastValue(column[1].getInt(i), curTime);
+ }
+ }
+ }
+
+ protected void updateIntLastValue(int value, long curTime) {
+ if (curTime > maxTime) {
+ maxTime = curTime;
+ lastValue.setInt(value);
+ }
+ }
+
+ protected void addLongInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+ updateLongLastValue(column[1].getLong(i), curTime);
+ }
+ }
+ }
+
+ protected void updateLongLastValue(long value, long curTime) {
+ if (curTime > maxTime) {
+ maxTime = curTime;
+ lastValue.setLong(value);
+ }
+ }
+
+ protected void addFloatInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+ updateFloatLastValue(column[1].getFloat(i), curTime);
+ }
+ }
+ }
+
+ protected void updateFloatLastValue(float value, long curTime) {
+ if (curTime > maxTime) {
+ maxTime = curTime;
+ lastValue.setFloat(value);
+ }
+ }
+
+ protected void addDoubleInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+ updateDoubleLastValue(column[1].getDouble(i), curTime);
+ }
+ }
+ }
+
+ protected void updateDoubleLastValue(double value, long curTime) {
+ if (curTime > maxTime) {
+ maxTime = curTime;
+ lastValue.setDouble(value);
+ }
+ }
+
+ protected void addBooleanInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+ updateBooleanLastValue(column[1].getBoolean(i), curTime);
+ }
+ }
+ }
+
+ protected void updateBooleanLastValue(boolean value, long curTime) {
+ if (curTime > maxTime) {
+ maxTime = curTime;
+ lastValue.setBoolean(value);
+ }
+ }
+
+ protected void addBinaryInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+ updateBinaryLastValue(column[1].getBinary(i), curTime);
+ }
+ }
+ }
+
+ protected void updateBinaryLastValue(Binary value, long curTime) {
+ if (curTime > maxTime) {
+ maxTime = curTime;
+ lastValue.setBinary(value);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueDescAccumulator.java
new file mode 100644
index 0000000000..3360bad7a3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueDescAccumulator.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+public class LastValueDescAccumulator extends LastValueAccumulator {
+
+ private boolean hasCandidateResult = false;
+
+ public LastValueDescAccumulator(TSDataType seriesDataType) {
+ super(seriesDataType);
+ }
+
+ @Override
+ public boolean hasFinalResult() {
+ return hasCandidateResult;
+ }
+
+ @Override
+ public void reset() {
+ hasCandidateResult = false;
+ super.reset();
+ }
+
+ protected void addIntInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+ updateIntLastValue(column[1].getInt(i), curTime);
+ break;
+ }
+ }
+ }
+
+ protected void addLongInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+ updateLongLastValue(column[1].getLong(i), curTime);
+ break;
+ }
+ }
+ }
+
+ protected void addFloatInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+ updateFloatLastValue(column[1].getFloat(i), curTime);
+ break;
+ }
+ }
+ }
+
+ protected void addDoubleInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+ updateDoubleLastValue(column[1].getDouble(i), curTime);
+ break;
+ }
+ }
+ }
+
+ protected void addBooleanInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+ updateBooleanLastValue(column[1].getBoolean(i), curTime);
+ break;
+ }
+ }
+ }
+
+ protected void addBinaryInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+ updateBinaryLastValue(column[1].getBinary(i), curTime);
+ break;
+ }
+ }
+ }
+
+ protected void updateIntLastValue(int value, long curTime) {
+ hasCandidateResult = true;
+ super.updateIntLastValue(value, curTime);
+ }
+
+ protected void updateLongLastValue(long value, long curTime) {
+ hasCandidateResult = true;
+ super.updateLongLastValue(value, curTime);
+ }
+
+ protected void updateFloatLastValue(float value, long curTime) {
+ hasCandidateResult = true;
+ super.updateFloatLastValue(value, curTime);
+ }
+
+ protected void updateDoubleLastValue(double value, long curTime) {
+ hasCandidateResult = true;
+ super.updateDoubleLastValue(value, curTime);
+ }
+
+ protected void updateBooleanLastValue(boolean value, long curTime) {
+ hasCandidateResult = true;
+ super.updateBooleanLastValue(value, curTime);
+ }
+
+ protected void updateBinaryLastValue(Binary value, long curTime) {
+ hasCandidateResult = true;
+ super.updateBinaryLastValue(value, curTime);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java
new file mode 100644
index 0000000000..ce47210337
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class MaxTimeAccumulator implements Accumulator {
+
+ protected long maxTime = Long.MIN_VALUE;
+
+ public MaxTimeAccumulator() {}
+
+ // Column should be like: | Time | Value |
+ // Value is used to judge isNull()
+ @Override
+ public void addInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+ updateMaxTime(curTime);
+ }
+ }
+ }
+
+ // partialResult should be like: | partialMaxTimeValue |
+ @Override
+ public void addIntermediate(Column[] partialResult) {
+ checkArgument(partialResult.length == 1, "partialResult of MaxTime should be 1");
+ updateMaxTime(partialResult[0].getLong(0));
+ }
+
+ @Override
+ public void addStatistics(Statistics statistics) {
+ updateMaxTime(statistics.getEndTime());
+ }
+
+ // finalResult should be single column, like: | finalMaxTime |
+ @Override
+ public void setFinal(Column finalResult) {
+ maxTime = finalResult.getLong(0);
+ }
+
+ // columnBuilder should be single in maxTimeAccumulator
+ @Override
+ public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+ checkArgument(columnBuilders.length == 1, "partialResult of MaxTime should be 1");
+ columnBuilders[0].writeLong(maxTime);
+ }
+
+ @Override
+ public void outputFinal(ColumnBuilder columnBuilder) {
+ columnBuilder.writeLong(maxTime);
+ }
+
+ @Override
+ public void reset() {
+ this.maxTime = Long.MIN_VALUE;
+ }
+
+ @Override
+ public boolean hasFinalResult() {
+ return false;
+ }
+
+ @Override
+ public TSDataType[] getIntermediateType() {
+ return new TSDataType[] {TSDataType.INT64};
+ }
+
+ @Override
+ public TSDataType getFinalType() {
+ return TSDataType.INT64;
+ }
+
+ protected void updateMaxTime(long curTime) {
+ maxTime = Math.max(maxTime, curTime);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeDescAccumulator.java
new file mode 100644
index 0000000000..b753e9ca3b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeDescAccumulator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public class MaxTimeDescAccumulator extends MaxTimeAccumulator {
+
+ private boolean hasCandidateResult = false;
+
+ // Column should be like: | Time | Value |
+ // Value is used to judge isNull()
+ @Override
+ public void addInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+ updateMaxTime(curTime);
+ break;
+ }
+ }
+ }
+
+ @Override
+ public boolean hasFinalResult() {
+ return hasCandidateResult;
+ }
+
+ @Override
+ public void reset() {
+ hasCandidateResult = false;
+ super.reset();
+ }
+
+ protected void updateMaxTime(long curTime) {
+ hasCandidateResult = true;
+ super.updateMaxTime(curTime);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java
new file mode 100644
index 0000000000..c07480a13d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class MaxValueAccumulator implements Accumulator {
+
+ private TSDataType seriesDataType;
+ private TsPrimitiveType maxResult;
+ private boolean initResult;
+
+ public MaxValueAccumulator(TSDataType seriesDataType) {
+ this.seriesDataType = seriesDataType;
+ this.maxResult = TsPrimitiveType.getByType(seriesDataType);
+ }
+
+ // Column should be like: | Time | Value |
+ @Override
+ public void addInput(Column[] column, TimeRange timeRange) {
+ switch (seriesDataType) {
+ case INT32:
+ addIntInput(column, timeRange);
+ break;
+ case INT64:
+ addLongInput(column, timeRange);
+ break;
+ case FLOAT:
+ addFloatInput(column, timeRange);
+ break;
+ case DOUBLE:
+ addDoubleInput(column, timeRange);
+ break;
+ case TEXT:
+ case BOOLEAN:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in MaxValue: %s", seriesDataType));
+ }
+ }
+
+ // partialResult should be like: | partialMaxValue1 |
+ @Override
+ public void addIntermediate(Column[] partialResult) {
+ checkArgument(partialResult.length == 1, "partialResult of MaxValue should be 1");
+ switch (seriesDataType) {
+ case INT32:
+ updateIntResult(partialResult[0].getInt(0));
+ break;
+ case INT64:
+ updateLongResult(partialResult[0].getLong(0));
+ break;
+ case FLOAT:
+ updateFloatResult(partialResult[0].getFloat(0));
+ break;
+ case DOUBLE:
+ updateDoubleResult(partialResult[0].getDouble(0));
+ break;
+ case TEXT:
+ case BOOLEAN:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in MaxValue: %s", seriesDataType));
+ }
+ }
+
+ @Override
+ public void addStatistics(Statistics statistics) {
+ switch (seriesDataType) {
+ case INT32:
+ updateIntResult((int) statistics.getMaxValue());
+ break;
+ case INT64:
+ updateLongResult((long) statistics.getMaxValue());
+ break;
+ case FLOAT:
+ updateFloatResult((float) statistics.getMaxValue());
+ break;
+ case DOUBLE:
+ updateDoubleResult((double) statistics.getMaxValue());
+ break;
+ case TEXT:
+ case BOOLEAN:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in MaxValue: %s", seriesDataType));
+ }
+ }
+
+ // finalResult should be single column, like: | finalCountValue |
+ @Override
+ public void setFinal(Column finalResult) {
+ maxResult.setObject(finalResult.getObject(0));
+ }
+
+ // columnBuilder should be single in countAccumulator
+ @Override
+ public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+ checkArgument(columnBuilders.length == 1, "partialResult of MaxValue should be 1");
+ switch (seriesDataType) {
+ case INT32:
+ columnBuilders[0].writeInt(maxResult.getInt());
+ break;
+ case INT64:
+ columnBuilders[0].writeLong(maxResult.getLong());
+ break;
+ case FLOAT:
+ columnBuilders[0].writeFloat(maxResult.getFloat());
+ break;
+ case DOUBLE:
+ columnBuilders[0].writeDouble(maxResult.getDouble());
+ break;
+ case TEXT:
+ case BOOLEAN:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in MaxValue: %s", seriesDataType));
+ }
+ }
+
+ @Override
+ public void outputFinal(ColumnBuilder columnBuilder) {
+ switch (seriesDataType) {
+ case INT32:
+ columnBuilder.writeInt(maxResult.getInt());
+ break;
+ case INT64:
+ columnBuilder.writeLong(maxResult.getLong());
+ break;
+ case FLOAT:
+ columnBuilder.writeFloat(maxResult.getFloat());
+ break;
+ case DOUBLE:
+ columnBuilder.writeDouble(maxResult.getDouble());
+ break;
+ case TEXT:
+ case BOOLEAN:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in MaxValue: %s", seriesDataType));
+ }
+ }
+
+ @Override
+ public void reset() {
+ initResult = false;
+ this.maxResult.reset();
+ }
+
+ @Override
+ public boolean hasFinalResult() {
+ return false;
+ }
+
+ @Override
+ public TSDataType[] getIntermediateType() {
+ return new TSDataType[] {maxResult.getDataType()};
+ }
+
+ @Override
+ public TSDataType getFinalType() {
+ return maxResult.getDataType();
+ }
+
+ private void addIntInput(Column[] column, TimeRange timeRange) {
+ TimeColumn timeColumn = (TimeColumn) column[0];
+ for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+ long curTime = timeColumn.getLong(i);
+ if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+ break;
+ }
+ if (!column[1].isNull(i)) {
+ updateIntResult(column[1].getInt(i));
+ }
+ }
+ }
+
+ private void updateIntResult(int maxVal) {
+ if (!initResult || maxVal > maxResult.getInt()) {
+ initResult = true;
+ maxResult.setInt(maxVal);
+ }
+ }
+
+ private void addLongInput(Column[] column, TimeRange timeRange) {
+ TimeColumn timeColumn = (TimeColumn) column[0];
+ for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+ long curTime = timeColumn.getLong(i);
+ if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+ break;
+ }
+ if (!column[1].isNull(i)) {
+ updateLongResult(column[1].getLong(i));
+ }
+ }
+ }
+
+ private void updateLongResult(long maxVal) {
+ if (!initResult || maxVal > maxResult.getLong()) {
+ initResult = true;
+ maxResult.setLong(maxVal);
+ }
+ }
+
+ private void addFloatInput(Column[] column, TimeRange timeRange) {
+ TimeColumn timeColumn = (TimeColumn) column[0];
+ for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+ long curTime = timeColumn.getLong(i);
+ if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+ break;
+ }
+ if (!column[1].isNull(i)) {
+ updateFloatResult(column[1].getFloat(i));
+ }
+ }
+ }
+
+ private void updateFloatResult(float maxVal) {
+ if (!initResult || maxVal > maxResult.getFloat()) {
+ initResult = true;
+ maxResult.setFloat(maxVal);
+ }
+ }
+
+ private void addDoubleInput(Column[] column, TimeRange timeRange) {
+ TimeColumn timeColumn = (TimeColumn) column[0];
+ for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+ long curTime = timeColumn.getLong(i);
+ if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+ break;
+ }
+ if (!column[1].isNull(i)) {
+ updateDoubleResult(column[1].getDouble(i));
+ }
+ }
+ }
+
+ private void updateDoubleResult(double maxVal) {
+ if (!initResult || maxVal > maxResult.getDouble()) {
+ initResult = true;
+ maxResult.setDouble(maxVal);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java
new file mode 100644
index 0000000000..a6af85093d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class MinTimeAccumulator implements Accumulator {
+
+ protected boolean hasCandidateResult;
+ protected long minTime = Long.MAX_VALUE;
+
+ public MinTimeAccumulator() {}
+
+ // Column should be like: | Time | Value |
+ // Value is used to judge isNull()
+ @Override
+ public void addInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax() && !column[1].isNull(i)) {
+ updateMinTime(curTime);
+ break;
+ }
+ }
+ }
+
+ // partialResult should be like: | partialMinTimeValue |
+ @Override
+ public void addIntermediate(Column[] partialResult) {
+ checkArgument(partialResult.length == 1, "partialResult of MinTime should be 1");
+ updateMinTime(partialResult[0].getLong(0));
+ }
+
+ @Override
+ public void addStatistics(Statistics statistics) {
+ updateMinTime(statistics.getStartTime());
+ }
+
+ // finalResult should be single column, like: | finalMinTime |
+ @Override
+ public void setFinal(Column finalResult) {
+ minTime = finalResult.getLong(0);
+ }
+
+ // columnBuilder should be single in minTimeAccumulator
+ @Override
+ public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+ checkArgument(columnBuilders.length == 1, "partialResult of MinTime should be 1");
+ columnBuilders[0].writeLong(minTime);
+ }
+
+ @Override
+ public void outputFinal(ColumnBuilder columnBuilder) {
+ columnBuilder.writeLong(minTime);
+ }
+
+ @Override
+ public void reset() {
+ hasCandidateResult = false;
+ this.minTime = Long.MAX_VALUE;
+ }
+
+ @Override
+ public boolean hasFinalResult() {
+ return hasCandidateResult;
+ }
+
+ @Override
+ public TSDataType[] getIntermediateType() {
+ return new TSDataType[] {TSDataType.INT64};
+ }
+
+ @Override
+ public TSDataType getFinalType() {
+ return TSDataType.INT64;
+ }
+
+ protected void updateMinTime(long curTime) {
+ hasCandidateResult = true;
+ minTime = Math.min(minTime, curTime);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
similarity index 54%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
index 4ee69b14e7..e97fa3ca70 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
@@ -16,42 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution.operator.process;
-import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+package org.apache.iotdb.db.mpp.aggregation;
-import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
-public class AggregateOperator implements ProcessOperator {
+public class MinTimeDescAccumulator extends MinTimeAccumulator {
@Override
- public OperatorContext getOperatorContext() {
- return null;
+ public void addInput(Column[] column, TimeRange timeRange) {
+ for (int i = 0; i < column[0].getPositionCount(); i++) {
+ long curTime = column[0].getLong(i);
+ if (curTime >= timeRange.getMin() && curTime < timeRange.getMax()) {
+ updateMinTime(curTime);
+ }
+ }
}
@Override
- public ListenableFuture<Void> isBlocked() {
- return ProcessOperator.super.isBlocked();
- }
-
- @Override
- public TsBlock next() {
- return null;
- }
-
- @Override
- public boolean hasNext() {
- return false;
- }
-
- @Override
- public void close() throws Exception {
- ProcessOperator.super.close();
- }
-
- @Override
- public boolean isFinished() {
+ public boolean hasFinalResult() {
return false;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java
new file mode 100644
index 0000000000..8b24de07ea
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class MinValueAccumulator implements Accumulator {
+
+ private TSDataType seriesDataType;
+ private TsPrimitiveType minResult;
+ private boolean initResult = false;
+
+ public MinValueAccumulator(TSDataType seriesDataType) {
+ this.seriesDataType = seriesDataType;
+ this.minResult = TsPrimitiveType.getByType(seriesDataType);
+ }
+
+ // Column should be like: | Time | Value |
+ @Override
+ public void addInput(Column[] column, TimeRange timeRange) {
+ switch (seriesDataType) {
+ case INT32:
+ addIntInput(column, timeRange);
+ break;
+ case INT64:
+ addLongInput(column, timeRange);
+ break;
+ case FLOAT:
+ addFloatInput(column, timeRange);
+ break;
+ case DOUBLE:
+ addDoubleInput(column, timeRange);
+ break;
+ case TEXT:
+ case BOOLEAN:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in MinValue: %s", seriesDataType));
+ }
+ }
+
+ // partialResult should be like: | partialMinValue1 |
+ @Override
+ public void addIntermediate(Column[] partialResult) {
+ checkArgument(partialResult.length == 1, "partialResult of MinValue should be 1");
+ switch (seriesDataType) {
+ case INT32:
+ updateIntResult(partialResult[0].getInt(0));
+ break;
+ case INT64:
+ updateLongResult(partialResult[0].getLong(0));
+ break;
+ case FLOAT:
+ updateFloatResult(partialResult[0].getFloat(0));
+ break;
+ case DOUBLE:
+ updateDoubleResult(partialResult[0].getDouble(0));
+ break;
+ case TEXT:
+ case BOOLEAN:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in MinValue: %s", seriesDataType));
+ }
+ }
+
+ @Override
+ public void addStatistics(Statistics statistics) {
+ switch (seriesDataType) {
+ case INT32:
+ updateIntResult((int) statistics.getMinValue());
+ break;
+ case INT64:
+ updateLongResult((long) statistics.getMinValue());
+ break;
+ case FLOAT:
+ updateFloatResult((float) statistics.getMinValue());
+ break;
+ case DOUBLE:
+ updateDoubleResult((double) statistics.getMinValue());
+ break;
+ case TEXT:
+ case BOOLEAN:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in MinValue: %s", seriesDataType));
+ }
+ }
+
+ // finalResult should be single column, like: | finalCountValue |
+ @Override
+ public void setFinal(Column finalResult) {
+ minResult.setObject(finalResult.getObject(0));
+ }
+
+ // columnBuilder should be single in MinValueAccumulator
+ @Override
+ public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+ checkArgument(columnBuilders.length == 1, "partialResult of MinValue should be 1");
+ switch (seriesDataType) {
+ case INT32:
+ columnBuilders[0].writeInt(minResult.getInt());
+ break;
+ case INT64:
+ columnBuilders[0].writeLong(minResult.getLong());
+ break;
+ case FLOAT:
+ columnBuilders[0].writeFloat(minResult.getFloat());
+ break;
+ case DOUBLE:
+ columnBuilders[0].writeDouble(minResult.getDouble());
+ break;
+ case TEXT:
+ case BOOLEAN:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in MinValue: %s", seriesDataType));
+ }
+ }
+
+ @Override
+ public void outputFinal(ColumnBuilder columnBuilder) {
+ switch (seriesDataType) {
+ case INT32:
+ columnBuilder.writeInt(minResult.getInt());
+ break;
+ case INT64:
+ columnBuilder.writeLong(minResult.getLong());
+ break;
+ case FLOAT:
+ columnBuilder.writeFloat(minResult.getFloat());
+ break;
+ case DOUBLE:
+ columnBuilder.writeDouble(minResult.getDouble());
+ break;
+ case TEXT:
+ case BOOLEAN:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in MinValue: %s", seriesDataType));
+ }
+ }
+
+ @Override
+ public void reset() {
+ initResult = false;
+ this.minResult.reset();
+ }
+
+ @Override
+ public boolean hasFinalResult() {
+ return false;
+ }
+
+ @Override
+ public TSDataType[] getIntermediateType() {
+ return new TSDataType[] {minResult.getDataType()};
+ }
+
+ @Override
+ public TSDataType getFinalType() {
+ return minResult.getDataType();
+ }
+
+ private void addIntInput(Column[] column, TimeRange timeRange) {
+ TimeColumn timeColumn = (TimeColumn) column[0];
+ for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+ long curTime = timeColumn.getLong(i);
+ if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+ break;
+ }
+ if (!column[1].isNull(i)) {
+ updateIntResult(column[1].getInt(i));
+ }
+ }
+ }
+
+ private void updateIntResult(int minVal) {
+ if (!initResult || minVal < minResult.getInt()) {
+ initResult = true;
+ minResult.setInt(minVal);
+ }
+ }
+
+ private void addLongInput(Column[] column, TimeRange timeRange) {
+ TimeColumn timeColumn = (TimeColumn) column[0];
+ for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+ long curTime = timeColumn.getLong(i);
+ if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+ break;
+ }
+ if (!column[1].isNull(i)) {
+ updateLongResult(column[1].getLong(i));
+ }
+ }
+ }
+
+ private void updateLongResult(long minVal) {
+ if (!initResult || minVal < minResult.getLong()) {
+ initResult = true;
+ minResult.setLong(minVal);
+ }
+ }
+
+ private void addFloatInput(Column[] column, TimeRange timeRange) {
+ TimeColumn timeColumn = (TimeColumn) column[0];
+ for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+ long curTime = timeColumn.getLong(i);
+ if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+ break;
+ }
+ if (!column[1].isNull(i)) {
+ updateFloatResult(column[1].getFloat(i));
+ }
+ }
+ }
+
+ private void updateFloatResult(float minVal) {
+ if (!initResult || minVal < minResult.getFloat()) {
+ initResult = true;
+ minResult.setFloat(minVal);
+ }
+ }
+
+ private void addDoubleInput(Column[] column, TimeRange timeRange) {
+ TimeColumn timeColumn = (TimeColumn) column[0];
+ for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+ long curTime = timeColumn.getLong(i);
+ if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+ break;
+ }
+ if (!column[1].isNull(i)) {
+ updateDoubleResult(column[1].getDouble(i));
+ }
+ }
+ }
+
+ private void updateDoubleResult(double minVal) {
+ if (!initResult || minVal < minResult.getDouble()) {
+ initResult = true;
+ minResult.setDouble(minVal);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java
new file mode 100644
index 0000000000..132fb017a9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class SumAccumulator implements Accumulator {
+
+ private TSDataType seriesDataType;
+ private double sumValue = 0;
+
+ public SumAccumulator(TSDataType seriesDataType) {
+ this.seriesDataType = seriesDataType;
+ }
+
+ // Column should be like: | Time | Value |
+ @Override
+ public void addInput(Column[] column, TimeRange timeRange) {
+ switch (seriesDataType) {
+ case INT32:
+ addIntInput(column, timeRange);
+ break;
+ case INT64:
+ addLongInput(column, timeRange);
+ break;
+ case FLOAT:
+ addFloatInput(column, timeRange);
+ break;
+ case DOUBLE:
+ addDoubleInput(column, timeRange);
+ break;
+ case TEXT:
+ case BOOLEAN:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in aggregation AVG : %s", seriesDataType));
+ }
+ }
+
+ // partialResult should be like: | partialSumValue1 |
+ @Override
+ public void addIntermediate(Column[] partialResult) {
+ checkArgument(partialResult.length == 1, "partialResult of Sum should be 1");
+ sumValue += partialResult[0].getDouble(0);
+ }
+
+ @Override
+ public void addStatistics(Statistics statistics) {
+ if (statistics instanceof IntegerStatistics) {
+ sumValue += statistics.getSumLongValue();
+ } else {
+ sumValue += statistics.getSumDoubleValue();
+ }
+ }
+
+ // finalResult should be single column, like: | finalSumValue |
+ @Override
+ public void setFinal(Column finalResult) {
+ reset();
+ sumValue = finalResult.getDouble(0);
+ }
+
+ // columnBuilder should be single in countAccumulator
+ @Override
+ public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+ checkArgument(columnBuilders.length == 1, "partialResult of Sum should be 1");
+ columnBuilders[0].writeDouble(sumValue);
+ }
+
+ @Override
+ public void outputFinal(ColumnBuilder columnBuilder) {
+ columnBuilder.writeDouble(sumValue);
+ }
+
+ @Override
+ public void reset() {
+ this.sumValue = 0;
+ }
+
+ @Override
+ public boolean hasFinalResult() {
+ return false;
+ }
+
+ @Override
+ public TSDataType[] getIntermediateType() {
+ return new TSDataType[] {TSDataType.DOUBLE};
+ }
+
+ @Override
+ public TSDataType getFinalType() {
+ return TSDataType.DOUBLE;
+ }
+
+ private void addIntInput(Column[] column, TimeRange timeRange) {
+ TimeColumn timeColumn = (TimeColumn) column[0];
+ for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+ long curTime = timeColumn.getLong(i);
+ if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+ break;
+ }
+ if (!column[1].isNull(i)) {
+ sumValue += column[1].getInt(i);
+ }
+ }
+ }
+
+ private void addLongInput(Column[] column, TimeRange timeRange) {
+ TimeColumn timeColumn = (TimeColumn) column[0];
+ for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+ long curTime = timeColumn.getLong(i);
+ if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+ break;
+ }
+ if (!column[1].isNull(i)) {
+ sumValue += column[1].getLong(i);
+ }
+ }
+ }
+
+ private void addFloatInput(Column[] column, TimeRange timeRange) {
+ TimeColumn timeColumn = (TimeColumn) column[0];
+ for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+ long curTime = timeColumn.getLong(i);
+ if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+ break;
+ }
+ if (!column[1].isNull(i)) {
+ sumValue += column[1].getFloat(i);
+ }
+ }
+ }
+
+ private void addDoubleInput(Column[] column, TimeRange timeRange) {
+ TimeColumn timeColumn = (TimeColumn) column[0];
+ for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+ long curTime = timeColumn.getLong(i);
+ if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+ break;
+ }
+ if (!column[1].isNull(i)) {
+ sumValue += column[1].getDouble(i);
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java
index 4ee69b14e7..71e7817b94 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java
@@ -18,16 +18,31 @@
*/
package org.apache.iotdb.db.mpp.execution.operator.process;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
+import java.util.List;
+
public class AggregateOperator implements ProcessOperator {
+ private final OperatorContext operatorContext;
+ private final List<Aggregator> aggregators;
+ private final List<Operator> children;
+
+ public AggregateOperator(
+ OperatorContext operatorContext, List<Aggregator> aggregators, List<Operator> children) {
+ this.operatorContext = operatorContext;
+ this.aggregators = aggregators;
+ this.children = children;
+ }
+
@Override
public OperatorContext getOperatorContext() {
- return null;
+ return operatorContext;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
index 04658826c2..a90dcfd0bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
@@ -114,7 +114,7 @@ public class DeviceViewOperator implements ProcessOperator {
}
// construct device column
ColumnBuilder deviceColumnBuilder = new BinaryColumnBuilder(null, 1);
- deviceColumnBuilder.writeObject(new Binary(getCurDeviceName()));
+ deviceColumnBuilder.writeBinary(new Binary(getCurDeviceName()));
newValueColumns[0] =
new RunLengthEncodedColumn(deviceColumnBuilder.build(), tsBlock.getPositionCount());
// construct other null columns
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
index 842b907bcc..e5d657713f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
@@ -19,18 +19,15 @@
package org.apache.iotdb.db.mpp.execution.operator.source;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
-import org.apache.iotdb.db.query.aggregation.AggregateResult;
-import org.apache.iotdb.db.query.aggregation.AggregationType;
-import org.apache.iotdb.db.query.factory.AggregateResultFactory;
-import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.utils.timerangeiterator.SingleTimeWindowIterator;
import org.apache.iotdb.db.utils.timerangeiterator.TimeRangeIteratorFactory;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -44,9 +41,9 @@ import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Set;
-import java.util.stream.Collectors;
/**
* This operator is responsible to do the aggregation calculation for one series based on global
@@ -62,15 +59,15 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
private final PlanNodeId sourceId;
private final SeriesScanUtil seriesScanUtil;
private final boolean ascending;
- private List<AggregateResult> aggregateResultList;
+ // We still think aggregator in SeriesAggregateScanOperator is a inputRaw step.
+ // But in facing of statistics, it will invoke another method processStatistics()
+ private List<Aggregator> aggregators;
private ITimeRangeIterator timeRangeIterator;
// current interval of aggregation window [curStartTime, curEndTime)
private TimeRange curTimeRange;
- private TsBlockSingleColumnIterator preCachedData;
- // used for resetting the preCachedData to the last read index
- private int lastReadIndex;
+ private TsBlock preCachedData;
private TsBlockBuilder tsBlockBuilder;
private TsBlock resultTsBlock;
@@ -82,7 +79,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
PartialPath seriesPath,
Set<String> allSensors,
OperatorContext context,
- List<AggregationType> aggregateFuncList,
+ List<Aggregator> aggregators,
Filter timeFilter,
boolean ascending,
GroupByTimeParameter groupByTimeParameter) {
@@ -98,21 +95,12 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
timeFilter,
null,
ascending);
- aggregateResultList = new ArrayList<>(aggregateFuncList.size());
- for (AggregationType aggregationType : aggregateFuncList) {
- aggregateResultList.add(
- AggregateResultFactory.getAggrResultByType(
- aggregationType,
- seriesPath.getSeriesType(),
- seriesScanUtil.getOrderUtils().getAscending()));
+ this.aggregators = aggregators;
+ List<TSDataType> dataTypes = new ArrayList<>();
+ for (Aggregator aggregator : aggregators) {
+ dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
}
- tsBlockBuilder =
- new TsBlockBuilder(
- aggregateFuncList.stream()
- .map(
- functionType ->
- SchemaUtils.getSeriesTypeByPath(seriesPath, functionType.name()))
- .collect(Collectors.toList()));
+ tsBlockBuilder = new TsBlockBuilder(dataTypes);
this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter);
}
@@ -169,8 +157,9 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
curTimeRange = timeRangeIterator.nextTimeRange();
// 1. Clear previous aggregation result
- for (AggregateResult result : aggregateResultList) {
- result.reset();
+ for (Aggregator aggregator : aggregators) {
+ aggregator.reset();
+ aggregator.setTimeRange(curTimeRange);
}
// 2. Calculate aggregation result based on current time window
@@ -226,14 +215,19 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
}
private void updateResultTsBlockUsingAggregateResult() {
- // TODO AVG
tsBlockBuilder.reset();
TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
// Use start time of current time range as time column
timeColumnBuilder.writeLong(curTimeRange.getMin());
ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
- for (int i = 0; i < aggregateResultList.size(); i++) {
- columnBuilders[i].writeObject(aggregateResultList.get(i).getResult());
+ int columnIndex = 0;
+ for (Aggregator aggregator : aggregators) {
+ ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length];
+ columnBuilder[0] = columnBuilders[columnIndex++];
+ if (columnBuilder.length > 1) {
+ columnBuilder[1] = columnBuilders[columnIndex++];
+ }
+ aggregator.outputResult(columnBuilder);
}
tsBlockBuilder.declarePosition();
resultTsBlock = tsBlockBuilder.build();
@@ -273,41 +267,33 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
}
@SuppressWarnings("squid:S3776")
- private void calcFromBatch(TsBlockSingleColumnIterator blockIterator, TimeRange curTimeRange)
- throws IOException {
+ private void calcFromBatch(TsBlock tsBlock, TimeRange curTimeRange) {
// check if the batchData does not contain points in current interval
- if (!satisfied(blockIterator, curTimeRange)) {
+ if (tsBlock == null || !satisfied(tsBlock, curTimeRange)) {
return;
}
- for (AggregateResult result : aggregateResultList) {
+ // skip points that cannot be calculated
+ tsBlock = skipOutOfTimeRangePoints(tsBlock, curTimeRange);
+
+ for (Aggregator aggregator : aggregators) {
// current agg method has been calculated
- if (result.hasFinalResult()) {
+ if (aggregator.hasFinalResult()) {
continue;
}
- // lazy reset batch data for calculation
- blockIterator.setRowIndex(lastReadIndex);
- // skip points that cannot be calculated
- skipOutOfTimeRangePoints(blockIterator, curTimeRange);
-
- if (blockIterator.hasNext()) {
- result.updateResultFromPageData(
- blockIterator, curTimeRange.getMin(), curTimeRange.getMax());
- }
- }
- // reset the last position to current Index
- lastReadIndex = blockIterator.getRowIndex();
+ aggregator.processTsBlock(tsBlock);
+ }
// can calc for next interval
- if (blockIterator.hasNext()) {
- preCachedData = blockIterator;
+ if (tsBlock.getTsBlockSingleColumnIterator().hasNext()) {
+ preCachedData = tsBlock;
}
}
// skip points that cannot be calculated
- private void skipOutOfTimeRangePoints(
- TsBlockSingleColumnIterator tsBlockIterator, TimeRange curTimeRange) {
+ private TsBlock skipOutOfTimeRangePoints(TsBlock tsBlock, TimeRange curTimeRange) {
+ TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator();
if (ascending) {
while (tsBlockIterator.hasNext() && tsBlockIterator.currentTime() < curTimeRange.getMin()) {
tsBlockIterator.next();
@@ -317,9 +303,11 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
tsBlockIterator.next();
}
}
+ return tsBlock.subTsBlock(tsBlockIterator.getRowIndex());
}
- private boolean satisfied(TsBlockSingleColumnIterator tsBlockIterator, TimeRange timeRange) {
+ private boolean satisfied(TsBlock tsBlock, TimeRange timeRange) {
+ TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator();
if (tsBlockIterator == null || !tsBlockIterator.hasNext()) {
return false;
}
@@ -332,15 +320,15 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
if (!ascending
&& (tsBlockIterator.getStartTime() >= timeRange.getMax()
|| tsBlockIterator.currentTime() < timeRange.getMin())) {
- preCachedData = tsBlockIterator;
+ preCachedData = tsBlock;
return false;
}
return true;
}
private boolean isEndCalc() {
- for (AggregateResult result : aggregateResultList) {
- if (!result.hasFinalResult()) {
+ for (Aggregator aggregator : aggregators) {
+ if (!aggregator.hasFinalResult()) {
return false;
}
}
@@ -375,23 +363,23 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
}
// calc from page data
- TsBlockSingleColumnIterator tsBlockIterator =
- seriesScanUtil.nextPage().getTsBlockSingleColumnIterator();
+ TsBlock tsBlock = seriesScanUtil.nextPage();
+ TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator();
if (tsBlockIterator == null || !tsBlockIterator.hasNext()) {
continue;
}
// reset the last position to current Index
- lastReadIndex = tsBlockIterator.getRowIndex();
+ // lastReadIndex = tsBlockIterator.getRowIndex();
// stop calc and cached current batchData
if (ascending && tsBlockIterator.currentTime() >= curTimeRange.getMax()) {
- preCachedData = tsBlockIterator;
+ preCachedData = tsBlock;
return true;
}
// calc from batch data
- calcFromBatch(tsBlockIterator, curTimeRange);
+ calcFromBatch(tsBlock, curTimeRange);
// judge whether the calculation finished
if (isEndCalc()
@@ -432,15 +420,12 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
}
private void calcFromStatistics(Statistics statistics) {
- try {
- for (AggregateResult result : aggregateResultList) {
- if (result.hasFinalResult()) {
- continue;
- }
- result.updateResultFromStatistics(statistics);
+ for (int i = 0; i < aggregators.size(); i++) {
+ Aggregator aggregator = aggregators.get(i);
+ if (aggregator.hasFinalResult()) {
+ continue;
}
- } catch (QueryProcessException e) {
- throw new RuntimeException("Error while updating result using statistics", e);
+ aggregator.processStatistics(statistics);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index fb876a35ac..4f42b6871c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -321,7 +321,7 @@ public class LocalExecutionPlanner {
seriesPath,
node.getAllSensors(),
operatorContext,
- node.getAggregateFuncList(),
+ null,
node.getTimeFilter(),
ascending,
node.getGroupByTimeParameter());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
index f9d2006473..e7f0b42e36 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
@@ -59,6 +59,10 @@ public class AggregationDescriptor {
return aggregationType;
}
+ public AggregationStep getStep() {
+ return step;
+ }
+
public void serialize(ByteBuffer byteBuffer) {
ReadWriteIOUtils.write(aggregationType.ordinal(), byteBuffer);
step.serialize(byteBuffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/InputLocation.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/InputLocation.java
index 0350c7db0f..d7fcca9e3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/InputLocation.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/InputLocation.java
@@ -24,9 +24,9 @@ import java.nio.ByteBuffer;
import java.util.Objects;
public class InputLocation {
- // which input tsblock
+ // which input tsBlock
private final int tsBlockIndex;
- // which value column of that tsblock
+ // which value column of that tsBlock
private final int valueColumnIndex;
public InputLocation(int tsBlockIndex, int valueColumnIndex) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/AccumulatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/AccumulatorTest.java
new file mode 100644
index 0000000000..6fe08cd345
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/AccumulatorTest.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation;
+
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class AccumulatorTest {
+
+ private TsBlock rawData;
+ private Statistics statistics;
+ private TimeRange defaultTimeRange = new TimeRange(0, Long.MAX_VALUE);
+
+ @Before
+ public void setUp() {
+ initInputTsBlock();
+ }
+
+ private void initInputTsBlock() {
+ List<TSDataType> dataTypes = new ArrayList<>();
+ dataTypes.add(TSDataType.DOUBLE);
+ TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(dataTypes);
+ TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
+ ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
+ for (int i = 0; i < 100; i++) {
+ timeColumnBuilder.writeLong(i);
+ columnBuilders[0].writeDouble(i * 1.0);
+ tsBlockBuilder.declarePosition();
+ }
+ rawData = tsBlockBuilder.build();
+
+ statistics = Statistics.getStatsByType(TSDataType.DOUBLE);
+ statistics.update(100L, 100d);
+ }
+
+ @Test
+ public void avgAccumulatorTest() {
+ Accumulator avgAccumulator =
+ AccumulatorFactory.createAccumulator(AggregationType.AVG, TSDataType.DOUBLE, true);
+ Assert.assertEquals(TSDataType.INT64, avgAccumulator.getIntermediateType()[0]);
+ Assert.assertEquals(TSDataType.DOUBLE, avgAccumulator.getIntermediateType()[1]);
+ Assert.assertEquals(TSDataType.DOUBLE, avgAccumulator.getFinalType());
+
+ avgAccumulator.addInput(rawData.getTimeAndValueColumn(0), defaultTimeRange);
+ Assert.assertFalse(avgAccumulator.hasFinalResult());
+ ColumnBuilder[] intermediateResult = new ColumnBuilder[2];
+ intermediateResult[0] = new LongColumnBuilder(null, 1);
+ intermediateResult[1] = new DoubleColumnBuilder(null, 1);
+ avgAccumulator.outputIntermediate(intermediateResult);
+ Assert.assertEquals(100, intermediateResult[0].build().getLong(0));
+ Assert.assertEquals(4950d, intermediateResult[1].build().getDouble(0), 0.001);
+
+ // add intermediate result as input
+ avgAccumulator.addIntermediate(
+ new Column[] {intermediateResult[0].build(), intermediateResult[1].build()});
+ ColumnBuilder finalResult = new DoubleColumnBuilder(null, 1);
+ avgAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(49.5d, finalResult.build().getDouble(0), 0.001);
+
+ avgAccumulator.reset();
+ avgAccumulator.addStatistics(statistics);
+ finalResult = new DoubleColumnBuilder(null, 1);
+ avgAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(100d, finalResult.build().getDouble(0), 0.001);
+ }
+
+ @Test
+ public void countAccumulatorTest() {
+ Accumulator countAccumulator =
+ AccumulatorFactory.createAccumulator(AggregationType.COUNT, TSDataType.DOUBLE, true);
+ Assert.assertEquals(TSDataType.INT64, countAccumulator.getIntermediateType()[0]);
+ Assert.assertEquals(TSDataType.INT64, countAccumulator.getFinalType());
+
+ countAccumulator.addInput(rawData.getTimeAndValueColumn(0), defaultTimeRange);
+ Assert.assertFalse(countAccumulator.hasFinalResult());
+ ColumnBuilder[] intermediateResult = new ColumnBuilder[1];
+ intermediateResult[0] = new LongColumnBuilder(null, 1);
+ countAccumulator.outputIntermediate(intermediateResult);
+ Assert.assertEquals(100, intermediateResult[0].build().getLong(0));
+
+ // add intermediate result as input
+ countAccumulator.addIntermediate(new Column[] {intermediateResult[0].build()});
+ ColumnBuilder finalResult = new LongColumnBuilder(null, 1);
+ countAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(200, finalResult.build().getLong(0));
+
+ countAccumulator.reset();
+ countAccumulator.addStatistics(statistics);
+ finalResult = new LongColumnBuilder(null, 1);
+ countAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(1, finalResult.build().getLong(0));
+ }
+
+ @Test
+ public void extremeAccumulatorTest() {
+ Accumulator extremeAccumulator =
+ AccumulatorFactory.createAccumulator(AggregationType.EXTREME, TSDataType.DOUBLE, true);
+ Assert.assertEquals(TSDataType.DOUBLE, extremeAccumulator.getIntermediateType()[0]);
+ Assert.assertEquals(TSDataType.DOUBLE, extremeAccumulator.getFinalType());
+
+ extremeAccumulator.addInput(rawData.getTimeAndValueColumn(0), defaultTimeRange);
+ Assert.assertFalse(extremeAccumulator.hasFinalResult());
+ ColumnBuilder[] intermediateResult = new ColumnBuilder[1];
+ intermediateResult[0] = new DoubleColumnBuilder(null, 1);
+ extremeAccumulator.outputIntermediate(intermediateResult);
+ Assert.assertEquals(99d, intermediateResult[0].build().getDouble(0), 0.001);
+
+ // add intermediate result as input
+ extremeAccumulator.addIntermediate(new Column[] {intermediateResult[0].build()});
+ ColumnBuilder finalResult = new DoubleColumnBuilder(null, 1);
+ extremeAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(99d, finalResult.build().getDouble(0), 0.001);
+
+ extremeAccumulator.reset();
+ extremeAccumulator.addStatistics(statistics);
+ finalResult = new DoubleColumnBuilder(null, 1);
+ extremeAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(100d, finalResult.build().getDouble(0), 0.001);
+ }
+
+ @Test
+ public void firstValueAccumulatorTest() {
+ Accumulator firstValueAccumulator =
+ AccumulatorFactory.createAccumulator(AggregationType.FIRST_VALUE, TSDataType.DOUBLE, true);
+ Assert.assertEquals(TSDataType.DOUBLE, firstValueAccumulator.getIntermediateType()[0]);
+ Assert.assertEquals(TSDataType.INT64, firstValueAccumulator.getIntermediateType()[1]);
+ Assert.assertEquals(TSDataType.DOUBLE, firstValueAccumulator.getFinalType());
+
+ firstValueAccumulator.addInput(rawData.getTimeAndValueColumn(0), defaultTimeRange);
+ Assert.assertTrue(firstValueAccumulator.hasFinalResult());
+ ColumnBuilder[] intermediateResult = new ColumnBuilder[2];
+ intermediateResult[0] = new DoubleColumnBuilder(null, 1);
+ intermediateResult[1] = new LongColumnBuilder(null, 1);
+ firstValueAccumulator.outputIntermediate(intermediateResult);
+ Assert.assertEquals(0d, intermediateResult[0].build().getDouble(0), 0.001);
+ Assert.assertEquals(0L, intermediateResult[1].build().getLong(0));
+
+ // add intermediate result as input
+ firstValueAccumulator.addIntermediate(
+ new Column[] {intermediateResult[0].build(), intermediateResult[1].build()});
+ ColumnBuilder finalResult = new DoubleColumnBuilder(null, 1);
+ firstValueAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(0L, finalResult.build().getDouble(0), 0.001);
+
+ firstValueAccumulator.reset();
+ firstValueAccumulator.addStatistics(statistics);
+ finalResult = new DoubleColumnBuilder(null, 1);
+ firstValueAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(100d, finalResult.build().getDouble(0), 0.001);
+ }
+
+ @Test
+ public void lastValueAccumulatorTest() {
+ Accumulator lastValueAccumulator =
+ AccumulatorFactory.createAccumulator(AggregationType.LAST_VALUE, TSDataType.DOUBLE, true);
+ Assert.assertEquals(TSDataType.DOUBLE, lastValueAccumulator.getIntermediateType()[0]);
+ Assert.assertEquals(TSDataType.INT64, lastValueAccumulator.getIntermediateType()[1]);
+ Assert.assertEquals(TSDataType.DOUBLE, lastValueAccumulator.getFinalType());
+
+ lastValueAccumulator.addInput(rawData.getTimeAndValueColumn(0), defaultTimeRange);
+ ColumnBuilder[] intermediateResult = new ColumnBuilder[2];
+ intermediateResult[0] = new DoubleColumnBuilder(null, 1);
+ intermediateResult[1] = new LongColumnBuilder(null, 1);
+ lastValueAccumulator.outputIntermediate(intermediateResult);
+ Assert.assertEquals(99d, intermediateResult[0].build().getDouble(0), 0.001);
+ Assert.assertEquals(99L, intermediateResult[1].build().getLong(0));
+
+ // add intermediate result as input
+ lastValueAccumulator.addIntermediate(
+ new Column[] {intermediateResult[0].build(), intermediateResult[1].build()});
+ ColumnBuilder finalResult = new DoubleColumnBuilder(null, 1);
+ lastValueAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(99L, finalResult.build().getDouble(0), 0.001);
+
+ lastValueAccumulator.reset();
+ lastValueAccumulator.addStatistics(statistics);
+ finalResult = new DoubleColumnBuilder(null, 1);
+ lastValueAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(100d, finalResult.build().getDouble(0), 0.001);
+ }
+
+ @Test
+ public void maxTimeAccumulatorTest() {
+ Accumulator maxTimeAccumulator =
+ AccumulatorFactory.createAccumulator(AggregationType.MAX_TIME, TSDataType.DOUBLE, true);
+ Assert.assertEquals(TSDataType.INT64, maxTimeAccumulator.getIntermediateType()[0]);
+ Assert.assertEquals(TSDataType.INT64, maxTimeAccumulator.getFinalType());
+
+ maxTimeAccumulator.addInput(rawData.getTimeAndValueColumn(0), defaultTimeRange);
+ Assert.assertFalse(maxTimeAccumulator.hasFinalResult());
+ ColumnBuilder[] intermediateResult = new ColumnBuilder[1];
+ intermediateResult[0] = new LongColumnBuilder(null, 1);
+ maxTimeAccumulator.outputIntermediate(intermediateResult);
+ Assert.assertEquals(99, intermediateResult[0].build().getLong(0));
+
+ // add intermediate result as input
+ maxTimeAccumulator.addIntermediate(new Column[] {intermediateResult[0].build()});
+ ColumnBuilder finalResult = new LongColumnBuilder(null, 1);
+ maxTimeAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(99, finalResult.build().getLong(0));
+
+ maxTimeAccumulator.reset();
+ maxTimeAccumulator.addStatistics(statistics);
+ finalResult = new LongColumnBuilder(null, 1);
+ maxTimeAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(100, finalResult.build().getLong(0));
+ }
+
+ @Test
+ public void minTimeAccumulatorTest() {
+ Accumulator minTimeAccumulator =
+ AccumulatorFactory.createAccumulator(AggregationType.MIN_TIME, TSDataType.DOUBLE, true);
+ Assert.assertEquals(TSDataType.INT64, minTimeAccumulator.getIntermediateType()[0]);
+ Assert.assertEquals(TSDataType.INT64, minTimeAccumulator.getFinalType());
+
+ minTimeAccumulator.addInput(rawData.getTimeAndValueColumn(0), defaultTimeRange);
+ Assert.assertTrue(minTimeAccumulator.hasFinalResult());
+ ColumnBuilder[] intermediateResult = new ColumnBuilder[1];
+ intermediateResult[0] = new LongColumnBuilder(null, 1);
+ minTimeAccumulator.outputIntermediate(intermediateResult);
+ Assert.assertEquals(0, intermediateResult[0].build().getLong(0));
+
+ // add intermediate result as input
+ minTimeAccumulator.addIntermediate(new Column[] {intermediateResult[0].build()});
+ ColumnBuilder finalResult = new LongColumnBuilder(null, 1);
+ minTimeAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(0, finalResult.build().getLong(0));
+
+ minTimeAccumulator.reset();
+ minTimeAccumulator.addStatistics(statistics);
+ finalResult = new LongColumnBuilder(null, 1);
+ minTimeAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(100, finalResult.build().getLong(0));
+ }
+
+ @Test
+ public void maxValueAccumulatorTest() {
+ Accumulator extremeAccumulator =
+ AccumulatorFactory.createAccumulator(AggregationType.MAX_VALUE, TSDataType.DOUBLE, true);
+ Assert.assertEquals(TSDataType.DOUBLE, extremeAccumulator.getIntermediateType()[0]);
+ Assert.assertEquals(TSDataType.DOUBLE, extremeAccumulator.getFinalType());
+
+ extremeAccumulator.addInput(rawData.getTimeAndValueColumn(0), defaultTimeRange);
+ Assert.assertFalse(extremeAccumulator.hasFinalResult());
+ ColumnBuilder[] intermediateResult = new ColumnBuilder[1];
+ intermediateResult[0] = new DoubleColumnBuilder(null, 1);
+ extremeAccumulator.outputIntermediate(intermediateResult);
+ Assert.assertEquals(99d, intermediateResult[0].build().getDouble(0), 0.001);
+
+ // add intermediate result as input
+ extremeAccumulator.addIntermediate(new Column[] {intermediateResult[0].build()});
+ ColumnBuilder finalResult = new DoubleColumnBuilder(null, 1);
+ extremeAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(99d, finalResult.build().getDouble(0), 0.001);
+
+ extremeAccumulator.reset();
+ extremeAccumulator.addStatistics(statistics);
+ finalResult = new DoubleColumnBuilder(null, 1);
+ extremeAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(100d, finalResult.build().getDouble(0), 0.001);
+ }
+
+ @Test
+ public void minValueAccumulatorTest() {
+ Accumulator extremeAccumulator =
+ AccumulatorFactory.createAccumulator(AggregationType.MIN_VALUE, TSDataType.DOUBLE, true);
+ Assert.assertEquals(TSDataType.DOUBLE, extremeAccumulator.getIntermediateType()[0]);
+ Assert.assertEquals(TSDataType.DOUBLE, extremeAccumulator.getFinalType());
+
+ extremeAccumulator.addInput(rawData.getTimeAndValueColumn(0), defaultTimeRange);
+ Assert.assertFalse(extremeAccumulator.hasFinalResult());
+ ColumnBuilder[] intermediateResult = new ColumnBuilder[1];
+ intermediateResult[0] = new DoubleColumnBuilder(null, 1);
+ extremeAccumulator.outputIntermediate(intermediateResult);
+ Assert.assertEquals(0d, intermediateResult[0].build().getDouble(0), 0.001);
+
+ // add intermediate result as input
+ extremeAccumulator.addIntermediate(new Column[] {intermediateResult[0].build()});
+ ColumnBuilder finalResult = new DoubleColumnBuilder(null, 1);
+ extremeAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(0d, finalResult.build().getDouble(0), 0.001);
+
+ extremeAccumulator.reset();
+ extremeAccumulator.addStatistics(statistics);
+ finalResult = new DoubleColumnBuilder(null, 1);
+ extremeAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(100d, finalResult.build().getDouble(0), 0.001);
+ }
+
+ @Test
+ public void sumAccumulatorTest() {
+ Accumulator sumAccumulator =
+ AccumulatorFactory.createAccumulator(AggregationType.SUM, TSDataType.DOUBLE, true);
+ Assert.assertEquals(TSDataType.DOUBLE, sumAccumulator.getIntermediateType()[0]);
+ Assert.assertEquals(TSDataType.DOUBLE, sumAccumulator.getFinalType());
+
+ sumAccumulator.addInput(rawData.getTimeAndValueColumn(0), defaultTimeRange);
+ Assert.assertFalse(sumAccumulator.hasFinalResult());
+ ColumnBuilder[] intermediateResult = new ColumnBuilder[1];
+ intermediateResult[0] = new DoubleColumnBuilder(null, 1);
+ sumAccumulator.outputIntermediate(intermediateResult);
+ Assert.assertEquals(4950d, intermediateResult[0].build().getDouble(0), 0.001);
+
+ // add intermediate result as input
+ sumAccumulator.addIntermediate(new Column[] {intermediateResult[0].build()});
+ ColumnBuilder finalResult = new DoubleColumnBuilder(null, 1);
+ sumAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(9900d, finalResult.build().getDouble(0), 0.001);
+
+ sumAccumulator.reset();
+ sumAccumulator.addStatistics(statistics);
+ finalResult = new DoubleColumnBuilder(null, 1);
+ sumAccumulator.outputFinal(finalResult);
+ Assert.assertEquals(100d, finalResult.build().getDouble(0), 0.001);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregateScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregateScanOperatorTest.java
index 7b3a158c7e..36acc04562 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregateScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregateScanOperatorTest.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
@@ -33,6 +35,7 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregateScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
@@ -86,9 +89,12 @@ public class SeriesAggregateScanOperatorTest {
@Test
public void testAggregationWithoutTimeFilter() throws IllegalPathException {
+ List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
+ List<Aggregator> aggregators = new ArrayList<>();
+ AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+ .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
SeriesAggregateScanOperator seriesAggregateScanOperator =
- initSeriesAggregateScanOperator(
- Collections.singletonList(AggregationType.COUNT), null, true, null);
+ initSeriesAggregateScanOperator(aggregators, null, true, null);
int count = 0;
while (seriesAggregateScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -103,8 +109,11 @@ public class SeriesAggregateScanOperatorTest {
List<AggregationType> aggregationTypes = new ArrayList<>();
aggregationTypes.add(AggregationType.COUNT);
aggregationTypes.add(AggregationType.SUM);
+ List<Aggregator> aggregators = new ArrayList<>();
+ AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+ .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
SeriesAggregateScanOperator seriesAggregateScanOperator =
- initSeriesAggregateScanOperator(aggregationTypes, null, true, null);
+ initSeriesAggregateScanOperator(aggregators, null, true, null);
int count = 0;
while (seriesAggregateScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -124,8 +133,40 @@ public class SeriesAggregateScanOperatorTest {
aggregationTypes.add(AggregationType.MAX_TIME);
aggregationTypes.add(AggregationType.MAX_VALUE);
aggregationTypes.add(AggregationType.MIN_VALUE);
+ List<Aggregator> aggregators = new ArrayList<>();
+ AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+ .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
SeriesAggregateScanOperator seriesAggregateScanOperator =
- initSeriesAggregateScanOperator(aggregationTypes, null, true, null);
+ initSeriesAggregateScanOperator(aggregators, null, true, null);
+ int count = 0;
+ while (seriesAggregateScanOperator.hasNext()) {
+ TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+ assertEquals(20000, resultTsBlock.getColumn(0).getInt(0));
+ assertEquals(10499, resultTsBlock.getColumn(1).getInt(0));
+ assertEquals(0, resultTsBlock.getColumn(2).getLong(0));
+ assertEquals(499, resultTsBlock.getColumn(3).getLong(0));
+ assertEquals(20199, resultTsBlock.getColumn(4).getInt(0));
+ assertEquals(260, resultTsBlock.getColumn(5).getInt(0));
+ count++;
+ }
+ assertEquals(1, count);
+ }
+
+ @Test
+ public void testMultiAggregationFuncWithoutTimeFilterOrderByTimeDesc()
+ throws IllegalPathException {
+ List<AggregationType> aggregationTypes = new ArrayList<>();
+ aggregationTypes.add(AggregationType.FIRST_VALUE);
+ aggregationTypes.add(AggregationType.LAST_VALUE);
+ aggregationTypes.add(AggregationType.MIN_TIME);
+ aggregationTypes.add(AggregationType.MAX_TIME);
+ aggregationTypes.add(AggregationType.MAX_VALUE);
+ aggregationTypes.add(AggregationType.MIN_VALUE);
+ List<Aggregator> aggregators = new ArrayList<>();
+ AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, false)
+ .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
+ SeriesAggregateScanOperator seriesAggregateScanOperator =
+ initSeriesAggregateScanOperator(aggregators, null, false, null);
int count = 0;
while (seriesAggregateScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -142,10 +183,13 @@ public class SeriesAggregateScanOperatorTest {
@Test
public void testAggregationWithTimeFilter1() throws IllegalPathException {
+ List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
+ List<Aggregator> aggregators = new ArrayList<>();
+ AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+ .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
Filter timeFilter = TimeFilter.gtEq(120);
SeriesAggregateScanOperator seriesAggregateScanOperator =
- initSeriesAggregateScanOperator(
- Collections.singletonList(AggregationType.COUNT), timeFilter, true, null);
+ initSeriesAggregateScanOperator(aggregators, timeFilter, true, null);
int count = 0;
while (seriesAggregateScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -158,9 +202,12 @@ public class SeriesAggregateScanOperatorTest {
@Test
public void testAggregationWithTimeFilter2() throws IllegalPathException {
Filter timeFilter = TimeFilter.ltEq(379);
+ List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
+ List<Aggregator> aggregators = new ArrayList<>();
+ AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+ .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
SeriesAggregateScanOperator seriesAggregateScanOperator =
- initSeriesAggregateScanOperator(
- Collections.singletonList(AggregationType.COUNT), timeFilter, true, null);
+ initSeriesAggregateScanOperator(aggregators, timeFilter, true, null);
int count = 0;
while (seriesAggregateScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -173,9 +220,12 @@ public class SeriesAggregateScanOperatorTest {
@Test
public void testAggregationWithTimeFilter3() throws IllegalPathException {
Filter timeFilter = new AndFilter(TimeFilter.gtEq(100), TimeFilter.ltEq(399));
+ List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
+ List<Aggregator> aggregators = new ArrayList<>();
+ AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+ .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
SeriesAggregateScanOperator seriesAggregateScanOperator =
- initSeriesAggregateScanOperator(
- Collections.singletonList(AggregationType.COUNT), timeFilter, true, null);
+ initSeriesAggregateScanOperator(aggregators, timeFilter, true, null);
int count = 0;
while (seriesAggregateScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -194,9 +244,12 @@ public class SeriesAggregateScanOperatorTest {
aggregationTypes.add(AggregationType.MAX_TIME);
aggregationTypes.add(AggregationType.MAX_VALUE);
aggregationTypes.add(AggregationType.MIN_VALUE);
+ List<Aggregator> aggregators = new ArrayList<>();
+ AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+ .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
Filter timeFilter = new AndFilter(TimeFilter.gtEq(100), TimeFilter.ltEq(399));
SeriesAggregateScanOperator seriesAggregateScanOperator =
- initSeriesAggregateScanOperator(aggregationTypes, timeFilter, true, null);
+ initSeriesAggregateScanOperator(aggregators, timeFilter, true, null);
int count = 0;
while (seriesAggregateScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -215,9 +268,12 @@ public class SeriesAggregateScanOperatorTest {
public void testGroupByWithoutGlobalTimeFilter() throws IllegalPathException {
int[] result = new int[] {100, 100, 100, 100};
GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
+ List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
+ List<Aggregator> aggregators = new ArrayList<>();
+ AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+ .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
SeriesAggregateScanOperator seriesAggregateScanOperator =
- initSeriesAggregateScanOperator(
- Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter);
+ initSeriesAggregateScanOperator(aggregators, null, true, groupByTimeParameter);
int count = 0;
while (seriesAggregateScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -233,12 +289,12 @@ public class SeriesAggregateScanOperatorTest {
int[] result = new int[] {0, 80, 100, 80};
Filter timeFilter = new AndFilter(TimeFilter.gtEq(120), TimeFilter.ltEq(379));
GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
+ List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
+ List<Aggregator> aggregators = new ArrayList<>();
+ AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+ .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
SeriesAggregateScanOperator seriesAggregateScanOperator =
- initSeriesAggregateScanOperator(
- Collections.singletonList(AggregationType.COUNT),
- timeFilter,
- true,
- groupByTimeParameter);
+ initSeriesAggregateScanOperator(aggregators, timeFilter, true, groupByTimeParameter);
int count = 0;
while (seriesAggregateScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -264,8 +320,11 @@ public class SeriesAggregateScanOperatorTest {
aggregationTypes.add(AggregationType.MAX_VALUE);
aggregationTypes.add(AggregationType.MIN_VALUE);
GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
+ List<Aggregator> aggregators = new ArrayList<>();
+ AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+ .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
SeriesAggregateScanOperator seriesAggregateScanOperator =
- initSeriesAggregateScanOperator(aggregationTypes, null, true, groupByTimeParameter);
+ initSeriesAggregateScanOperator(aggregators, null, true, groupByTimeParameter);
int count = 0;
while (seriesAggregateScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -279,13 +338,49 @@ public class SeriesAggregateScanOperatorTest {
assertEquals(4, count);
}
+ @Test
+ public void testGroupByWithMultiFunctionOrderByTimeDesc() throws IllegalPathException {
+ int[][] result =
+ new int[][] {
+ {20000, 20100, 10200, 10300},
+ {20099, 20199, 299, 399},
+ {20099, 20199, 10259, 10379},
+ {20000, 20100, 260, 380}
+ };
+ List<AggregationType> aggregationTypes = new ArrayList<>();
+ aggregationTypes.add(AggregationType.FIRST_VALUE);
+ aggregationTypes.add(AggregationType.LAST_VALUE);
+ aggregationTypes.add(AggregationType.MAX_VALUE);
+ aggregationTypes.add(AggregationType.MIN_VALUE);
+ GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
+ List<Aggregator> aggregators = new ArrayList<>();
+ AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, false)
+ .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
+ SeriesAggregateScanOperator seriesAggregateScanOperator =
+ initSeriesAggregateScanOperator(aggregators, null, false, groupByTimeParameter);
+ int count = 0;
+ while (seriesAggregateScanOperator.hasNext()) {
+ TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+ assertEquals(100 * (3 - count), resultTsBlock.getTimeColumn().getLong(0));
+ assertEquals(result[0][3 - count], resultTsBlock.getColumn(0).getInt(0));
+ assertEquals(result[1][3 - count], resultTsBlock.getColumn(1).getInt(0));
+ assertEquals(result[2][3 - count], resultTsBlock.getColumn(2).getInt(0));
+ assertEquals(result[3][3 - count], resultTsBlock.getColumn(3).getInt(0));
+ count++;
+ }
+ assertEquals(4, count);
+ }
+
@Test
public void testGroupBySlidingTimeWindow() throws IllegalPathException {
int[] result = new int[] {50, 50, 50, 50, 50, 50, 50, 50};
GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 50, true);
+ List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
+ List<Aggregator> aggregators = new ArrayList<>();
+ AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+ .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
SeriesAggregateScanOperator seriesAggregateScanOperator =
- initSeriesAggregateScanOperator(
- Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter);
+ initSeriesAggregateScanOperator(aggregators, null, true, groupByTimeParameter);
int count = 0;
while (seriesAggregateScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -301,9 +396,12 @@ public class SeriesAggregateScanOperatorTest {
int[] timeColumn = new int[] {0, 20, 30, 50, 60, 80, 90, 110, 120, 140};
int[] result = new int[] {20, 10, 20, 10, 20, 10, 20, 10, 20, 9};
GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 149, 50, 30, true);
+ List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
+ List<Aggregator> aggregators = new ArrayList<>();
+ AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+ .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
SeriesAggregateScanOperator seriesAggregateScanOperator =
- initSeriesAggregateScanOperator(
- Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter);
+ initSeriesAggregateScanOperator(aggregators, null, true, groupByTimeParameter);
int count = 0;
while (seriesAggregateScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -330,8 +428,11 @@ public class SeriesAggregateScanOperatorTest {
aggregationTypes.add(AggregationType.MAX_VALUE);
aggregationTypes.add(AggregationType.MIN_VALUE);
GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 149, 50, 30, true);
+ List<Aggregator> aggregators = new ArrayList<>();
+ AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+ .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
SeriesAggregateScanOperator seriesAggregateScanOperator =
- initSeriesAggregateScanOperator(aggregationTypes, null, true, groupByTimeParameter);
+ initSeriesAggregateScanOperator(aggregators, null, true, groupByTimeParameter);
int count = 0;
while (seriesAggregateScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -346,7 +447,7 @@ public class SeriesAggregateScanOperatorTest {
}
public SeriesAggregateScanOperator initSeriesAggregateScanOperator(
- List<AggregationType> aggregateFuncList,
+ List<Aggregator> aggregators,
Filter timeFilter,
boolean ascending,
GroupByTimeParameter groupByTimeParameter)
@@ -373,7 +474,7 @@ public class SeriesAggregateScanOperatorTest {
measurementPath,
allSensors,
fragmentInstanceContext.getOperatorContexts().get(0),
- aggregateFuncList,
+ aggregators,
timeFilter,
ascending,
groupByTimeParameter);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
index a5657d74fe..d3ffe65ff3 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
@@ -92,12 +92,6 @@ public class TsBlock {
}
}
- public boolean hasNext() {
- return false;
- }
-
- public void next() {}
-
public int getPositionCount() {
return positionCount;
}
@@ -170,6 +164,23 @@ public class TsBlock {
return wrapBlocksWithoutCopy(positionCount, timeColumn, newBlocks);
}
+ /**
+ * This method will create a temporary view of origin tsBlock, which will reuse the arrays of
+ * columns but with different offset. It can be used where you want to skip some points when
+ * getting iterator.
+ */
+ public TsBlock subTsBlock(int fromIndex) {
+ if (fromIndex > positionCount) {
+ throw new IllegalArgumentException("FromIndex of subTsBlock cannot over positionCount.");
+ }
+ TimeColumn subTimeColumn = (TimeColumn) timeColumn.subColumn(fromIndex);
+ Column[] subValueColumns = new Column[valueColumns.length];
+ for (int i = 0; i < subValueColumns.length; i++) {
+ subValueColumns[i] = valueColumns[i].subColumn(fromIndex);
+ }
+ return new TsBlock(subTimeColumn, subValueColumns);
+ }
+
public long getTimeByIndex(int index) {
return timeColumn.getLong(index);
}
@@ -186,6 +197,21 @@ public class TsBlock {
return valueColumns[columnIndex];
}
+ public Column[] getTimeAndValueColumn(int columnIndex) {
+ Column[] columns = new Column[2];
+ columns[0] = getTimeColumn();
+ columns[1] = getColumn(columnIndex);
+ return columns;
+ }
+
+ public Column[] getColumns(int[] columnIndexes) {
+ Column[] columns = new Column[columnIndexes.length];
+ for (int i = 0; i < columnIndexes.length; i++) {
+ columns[i] = valueColumns[columnIndexes[i]];
+ }
+ return columns;
+ }
+
public TsBlockSingleColumnIterator getTsBlockSingleColumnIterator() {
return new TsBlockSingleColumnIterator(0);
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
index f6b2c50e66..80fba8b7d2 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
@@ -123,6 +123,15 @@ public class BinaryColumn implements Column {
return new BinaryColumn(positionOffset + arrayOffset, length, valueIsNull, values);
}
+ @Override
+ public Column subColumn(int fromIndex) {
+ if (fromIndex > positionCount) {
+ throw new IllegalArgumentException("fromIndex is not valid");
+ }
+ return new BinaryColumn(
+ arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values);
+ }
+
@Override
public void reverse() {
for (int i = arrayOffset, j = arrayOffset + positionCount - 1; i < j; i++, j--) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
index 23dd445a34..4fab293acb 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
@@ -122,6 +122,15 @@ public class BooleanColumn implements Column {
return new BooleanColumn(positionOffset + arrayOffset, length, valueIsNull, values);
}
+ @Override
+ public Column subColumn(int fromIndex) {
+ if (fromIndex > positionCount) {
+ throw new IllegalArgumentException("fromIndex is not valid");
+ }
+ return new BooleanColumn(
+ arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values);
+ }
+
@Override
public void reverse() {
for (int i = arrayOffset, j = arrayOffset + positionCount - 1; i < j; i++, j--) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
index c9ad6eddb3..2e796a5f68 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
@@ -103,6 +103,12 @@ public interface Column {
*/
Column getRegion(int positionOffset, int length);
+ /**
+ * This method will create a temporary view of origin column, which will reuse the array of column
+ * but with different array offset.
+ */
+ Column subColumn(int fromIndex);
+
/** reverse the column */
void reverse();
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
index a9e64e4224..25c97b188f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
@@ -122,6 +122,15 @@ public class DoubleColumn implements Column {
return new DoubleColumn(positionOffset + arrayOffset, length, valueIsNull, values);
}
+ @Override
+ public Column subColumn(int fromIndex) {
+ if (fromIndex > positionCount) {
+ throw new IllegalArgumentException("fromIndex is not valid");
+ }
+ return new DoubleColumn(
+ arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values);
+ }
+
@Override
public void reverse() {
for (int i = arrayOffset, j = arrayOffset + positionCount - 1; i < j; i++, j--) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
index efa1243028..b1831d15cb 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
@@ -121,6 +121,14 @@ public class FloatColumn implements Column {
return new FloatColumn(positionOffset + arrayOffset, length, valueIsNull, values);
}
+ @Override
+ public Column subColumn(int fromIndex) {
+ if (fromIndex > positionCount) {
+ throw new IllegalArgumentException("fromIndex is not valid");
+ }
+ return new FloatColumn(arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values);
+ }
+
@Override
public void reverse() {
for (int i = arrayOffset, j = arrayOffset + positionCount - 1; i < j; i++, j--) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
index 5ba777c137..f2556c75ed 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
@@ -121,6 +121,14 @@ public class IntColumn implements Column {
return new IntColumn(positionOffset + arrayOffset, length, valueIsNull, values);
}
+ @Override
+ public Column subColumn(int fromIndex) {
+ if (fromIndex > positionCount) {
+ throw new IllegalArgumentException("fromIndex is not valid");
+ }
+ return new IntColumn(arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values);
+ }
+
@Override
public void reverse() {
for (int i = arrayOffset, j = arrayOffset + positionCount - 1; i < j; i++, j--) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
index 4fd1fadbb2..e4364a7072 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
@@ -121,6 +121,14 @@ public class LongColumn implements Column {
return new LongColumn(positionOffset + arrayOffset, length, valueIsNull, values);
}
+ @Override
+ public Column subColumn(int fromIndex) {
+ if (fromIndex > positionCount) {
+ throw new IllegalArgumentException("fromIndex is not valid");
+ }
+ return new LongColumn(arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values);
+ }
+
@Override
public void reverse() {
for (int i = arrayOffset, j = arrayOffset + positionCount - 1; i < j; i++, j--) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
index d82d7cf464..b33995ee92 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
@@ -147,6 +147,14 @@ public class RunLengthEncodedColumn implements Column {
return new RunLengthEncodedColumn(value, length);
}
+ @Override
+ public Column subColumn(int fromIndex) {
+ if (fromIndex > positionCount) {
+ throw new IllegalArgumentException("fromIndex is not valid");
+ }
+ return new RunLengthEncodedColumn(value, positionCount - fromIndex);
+ }
+
@Override
public void reverse() {
// do nothing because the underlying column has only one value
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
index 2f8176b699..3f8db4f31d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
@@ -104,6 +104,14 @@ public class TimeColumn implements Column {
return new TimeColumn(positionOffset + arrayOffset, length, values);
}
+ @Override
+ public Column subColumn(int fromIndex) {
+ if (fromIndex > positionCount) {
+ throw new IllegalArgumentException("fromIndex is not valid");
+ }
+ return new TimeColumn(arrayOffset + fromIndex, positionCount - fromIndex, values);
+ }
+
@Override
public void reverse() {
for (int i = arrayOffset, j = arrayOffset + positionCount - 1; i < j; i++, j--) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
index 7bfd218374..42d8c96cfa 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
@@ -29,8 +29,34 @@ public abstract class TsPrimitiveType implements Serializable {
/**
* get tsPrimitiveType by resultDataType.
*
- * @param dataType -given TsDataType
- * @param v -
+ * @param dataType given TsDataType
+ */
+ public static TsPrimitiveType getByType(TSDataType dataType) {
+ switch (dataType) {
+ case BOOLEAN:
+ return new TsPrimitiveType.TsBoolean();
+ case INT32:
+ return new TsPrimitiveType.TsInt();
+ case INT64:
+ return new TsPrimitiveType.TsLong();
+ case FLOAT:
+ return new TsPrimitiveType.TsFloat();
+ case DOUBLE:
+ return new TsPrimitiveType.TsDouble();
+ case TEXT:
+ return new TsPrimitiveType.TsBinary();
+ case VECTOR:
+ return new TsPrimitiveType.TsVector();
+ default:
+ throw new UnSupportedDataTypeException("Unsupported data type:" + dataType);
+ }
+ }
+
+ /**
+ * get tsPrimitiveType by resultDataType and initial value.
+ *
+ * @param dataType given TsDataType
+ * @param v initial value
*/
public static TsPrimitiveType getByType(TSDataType dataType, Object v) {
switch (dataType) {
@@ -109,6 +135,10 @@ public abstract class TsPrimitiveType implements Serializable {
throw new UnsupportedOperationException("setVector() is not supported for current sub-class");
}
+ public abstract void setObject(Object val);
+
+ public abstract void reset();
+
/**
* get the size of one instance of current class.
*
@@ -142,6 +172,8 @@ public abstract class TsPrimitiveType implements Serializable {
private boolean value;
+ public TsBoolean() {}
+
public TsBoolean(boolean value) {
this.value = value;
}
@@ -156,6 +188,20 @@ public abstract class TsPrimitiveType implements Serializable {
this.value = val;
}
+ @Override
+ public void setObject(Object val) {
+ if (val instanceof Boolean) {
+ setBoolean((Boolean) val);
+ return;
+ }
+ throw new UnSupportedDataTypeException("TsBoolean can only be set Boolean value");
+ }
+
+ @Override
+ public void reset() {
+ value = false;
+ }
+
@Override
public int getSize() {
return 4 + 1;
@@ -198,6 +244,8 @@ public abstract class TsPrimitiveType implements Serializable {
private int value;
+ public TsInt() {}
+
public TsInt(int value) {
this.value = value;
}
@@ -212,6 +260,20 @@ public abstract class TsPrimitiveType implements Serializable {
this.value = val;
}
+ @Override
+ public void setObject(Object val) {
+ if (val instanceof Integer) {
+ setInt((Integer) val);
+ return;
+ }
+ throw new UnSupportedDataTypeException("TsInt can only be set Integer value");
+ }
+
+ @Override
+ public void reset() {
+ value = 0;
+ }
+
@Override
public int getSize() {
return 4 + 4;
@@ -254,6 +316,8 @@ public abstract class TsPrimitiveType implements Serializable {
private long value;
+ public TsLong() {}
+
public TsLong(long value) {
this.value = value;
}
@@ -268,6 +332,20 @@ public abstract class TsPrimitiveType implements Serializable {
this.value = val;
}
+ @Override
+ public void setObject(Object val) {
+ if (val instanceof Long) {
+ setLong((Long) val);
+ return;
+ }
+ throw new UnSupportedDataTypeException("TsLong can only be set Long value");
+ }
+
+ @Override
+ public void reset() {
+ value = 0;
+ }
+
@Override
public int getSize() {
return 4 + 8;
@@ -310,6 +388,8 @@ public abstract class TsPrimitiveType implements Serializable {
private float value;
+ public TsFloat() {}
+
public TsFloat(float value) {
this.value = value;
}
@@ -324,6 +404,20 @@ public abstract class TsPrimitiveType implements Serializable {
this.value = val;
}
+ @Override
+ public void setObject(Object val) {
+ if (val instanceof Float) {
+ setFloat((Float) val);
+ return;
+ }
+ throw new UnSupportedDataTypeException("TsFloat can only be set float value");
+ }
+
+ @Override
+ public void reset() {
+ value = 0;
+ }
+
@Override
public int getSize() {
return 4 + 4;
@@ -366,6 +460,8 @@ public abstract class TsPrimitiveType implements Serializable {
private double value;
+ public TsDouble() {}
+
public TsDouble(double value) {
this.value = value;
}
@@ -380,6 +476,20 @@ public abstract class TsPrimitiveType implements Serializable {
this.value = val;
}
+ @Override
+ public void setObject(Object val) {
+ if (val instanceof Double) {
+ setDouble((Double) val);
+ return;
+ }
+ throw new UnSupportedDataTypeException("TsDouble can only be set Double value");
+ }
+
+ @Override
+ public void reset() {
+ value = 0.0;
+ }
+
@Override
public int getSize() {
return 4 + 8;
@@ -422,6 +532,8 @@ public abstract class TsPrimitiveType implements Serializable {
private Binary value;
+ public TsBinary() {}
+
public TsBinary(Binary value) {
this.value = value;
}
@@ -436,6 +548,20 @@ public abstract class TsPrimitiveType implements Serializable {
this.value = val;
}
+ @Override
+ public void setObject(Object val) {
+ if (val instanceof Binary) {
+ setBinary((Binary) val);
+ return;
+ }
+ throw new UnSupportedDataTypeException("TsBinary can only be set Binary value");
+ }
+
+ @Override
+ public void reset() {
+ value = null;
+ }
+
@Override
public int getSize() {
return 4 + 4 + value.getLength();
@@ -478,6 +604,8 @@ public abstract class TsPrimitiveType implements Serializable {
private TsPrimitiveType[] values;
+ public TsVector() {}
+
public TsVector(TsPrimitiveType[] values) {
this.values = values;
}
@@ -492,6 +620,20 @@ public abstract class TsPrimitiveType implements Serializable {
this.values = vals;
}
+ @Override
+ public void setObject(Object val) {
+ if (val instanceof TsPrimitiveType[]) {
+ setVector((TsPrimitiveType[]) val);
+ return;
+ }
+ throw new UnSupportedDataTypeException("TsVector can only be set TsPrimitiveType[] value");
+ }
+
+ @Override
+ public void reset() {
+ values = null;
+ }
+
@Override
public int getSize() {
int size = 0;
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/TsBlockTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/TsBlockTest.java
index 75d28596e9..669d3a41ad 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/TsBlockTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/TsBlockTest.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.common.block;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
@@ -30,6 +31,7 @@ import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
import org.apache.iotdb.tsfile.utils.Binary;
+import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
@@ -322,4 +324,51 @@ public class TsBlockTest {
}
}
}
+
+ @Test
+ public void testSubTsBlock() {
+ TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+ for (int i = 0; i < 10; i++) {
+ builder.getTimeColumnBuilder().writeLong(i);
+ builder.getColumnBuilder(0).writeInt(i);
+ builder.declarePosition();
+ }
+ TsBlock tsBlock = builder.build();
+ TsBlockSingleColumnIterator iterator = tsBlock.getTsBlockSingleColumnIterator();
+ int index = 0;
+ while (iterator.hasNext()) {
+ Assert.assertEquals(index, iterator.currentTime());
+ Assert.assertEquals(index, iterator.currentValue());
+ iterator.next();
+ index++;
+ }
+ // get subTsBlock from TsBlock, offset = 3
+ int offset = 3;
+ TsBlock subTsBlock = tsBlock.subTsBlock(offset);
+ iterator = subTsBlock.getTsBlockSingleColumnIterator();
+ index = offset;
+ while (iterator.hasNext()) {
+ Assert.assertEquals(index, iterator.currentTime());
+ Assert.assertEquals(index, iterator.currentValue());
+ iterator.next();
+ index++;
+ }
+ // get subSubTsBlock from subTsBlock, offset = 2
+ int nextOffset = 2;
+ TsBlock subSubTsBlock = subTsBlock.subTsBlock(nextOffset);
+ iterator = subSubTsBlock.getTsBlockSingleColumnIterator();
+ index = offset + nextOffset;
+ while (iterator.hasNext()) {
+ Assert.assertEquals(index, iterator.currentTime());
+ Assert.assertEquals(index, iterator.currentValue());
+ iterator.next();
+ index++;
+ }
+ try {
+ subSubTsBlock.subTsBlock(3);
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(
+ e.getMessage().contains("FromIndex of subTsBlock cannot over positionCount."));
+ }
+ }
}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/common/ColumnTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/common/ColumnTest.java
new file mode 100644
index 0000000000..f21df099f7
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/common/ColumnTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.read.common;
+
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ColumnTest {
+
+ @Test
+ public void timeColumnSubColumnTest() {
+ TimeColumnBuilder columnBuilder = new TimeColumnBuilder(null, 10);
+ for (int i = 0; i < 10; i++) {
+ columnBuilder.writeLong(i);
+ }
+ TimeColumn timeColumn = (TimeColumn) columnBuilder.build();
+ timeColumn = (TimeColumn) timeColumn.subColumn(5);
+ Assert.assertEquals(5, timeColumn.getPositionCount());
+ Assert.assertEquals(5, timeColumn.getLong(0));
+ Assert.assertEquals(9, timeColumn.getLong(4));
+ try {
+ timeColumn.getLong(5);
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("position is not valid"));
+ }
+ timeColumn = (TimeColumn) timeColumn.subColumn(3);
+ Assert.assertEquals(2, timeColumn.getPositionCount());
+ Assert.assertEquals(8, timeColumn.getLong(0));
+ Assert.assertEquals(9, timeColumn.getLong(1));
+ try {
+ timeColumn.getLong(2);
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("position is not valid"));
+ }
+ try {
+ timeColumn.subColumn(3);
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+ }
+ }
+
+ @Test
+ public void binaryColumnSubColumnTest() {
+ BinaryColumnBuilder columnBuilder = new BinaryColumnBuilder(null, 10);
+ for (int i = 0; i < 10; i++) {
+ columnBuilder.writeBinary(Binary.valueOf(String.valueOf(i)));
+ }
+ BinaryColumn binaryColumn = (BinaryColumn) columnBuilder.build();
+ binaryColumn = (BinaryColumn) binaryColumn.subColumn(5);
+ Assert.assertEquals(5, binaryColumn.getPositionCount());
+ Assert.assertEquals("5", binaryColumn.getBinary(0).toString());
+ Assert.assertEquals("9", binaryColumn.getBinary(4).toString());
+ try {
+ binaryColumn.getBinary(5);
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("position is not valid"));
+ }
+ binaryColumn = (BinaryColumn) binaryColumn.subColumn(3);
+ Assert.assertEquals(2, binaryColumn.getPositionCount());
+ Assert.assertEquals("8", binaryColumn.getBinary(0).toString());
+ Assert.assertEquals("9", binaryColumn.getBinary(1).toString());
+ try {
+ binaryColumn.getBinary(2);
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("position is not valid"));
+ }
+ try {
+ binaryColumn.subColumn(3);
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+ }
+ }
+
+ @Test
+ public void booleanColumnSubColumnTest() {
+ BooleanColumnBuilder columnBuilder = new BooleanColumnBuilder(null, 10);
+ // 0: true, 1: false
+ for (int i = 0; i < 10; i++) {
+ columnBuilder.writeBoolean(i % 2 == 0);
+ }
+ BooleanColumn booleanColumn = (BooleanColumn) columnBuilder.build();
+ booleanColumn = (BooleanColumn) booleanColumn.subColumn(5);
+ Assert.assertEquals(5, booleanColumn.getPositionCount());
+ Assert.assertFalse(booleanColumn.getBoolean(0));
+ Assert.assertFalse(booleanColumn.getBoolean(4));
+ try {
+ booleanColumn.getBoolean(5);
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("position is not valid"));
+ }
+ booleanColumn = (BooleanColumn) booleanColumn.subColumn(3);
+ Assert.assertEquals(2, booleanColumn.getPositionCount());
+ Assert.assertTrue(booleanColumn.getBoolean(0));
+ Assert.assertFalse(booleanColumn.getBoolean(1));
+ try {
+ booleanColumn.getBoolean(2);
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("position is not valid"));
+ }
+ try {
+ booleanColumn.subColumn(3);
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+ }
+ }
+
+ @Test
+ public void doubleColumnSubColumnTest() {
+ DoubleColumnBuilder columnBuilder = new DoubleColumnBuilder(null, 10);
+ for (int i = 0; i < 10; i++) {
+ columnBuilder.writeDouble(i);
+ }
+ DoubleColumn doubleColumn = (DoubleColumn) columnBuilder.build();
+ doubleColumn = (DoubleColumn) doubleColumn.subColumn(5);
+ Assert.assertEquals(5, doubleColumn.getPositionCount());
+ Assert.assertEquals(5.0, doubleColumn.getDouble(0), 0.001);
+ Assert.assertEquals(9.0, doubleColumn.getDouble(4), 0.001);
+ try {
+ doubleColumn.getDouble(5);
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("position is not valid"));
+ }
+ doubleColumn = (DoubleColumn) doubleColumn.subColumn(3);
+ Assert.assertEquals(2, doubleColumn.getPositionCount());
+ Assert.assertEquals(8.0, doubleColumn.getDouble(0), 0.001);
+ Assert.assertEquals(9.0, doubleColumn.getDouble(1), 0.001);
+ try {
+ doubleColumn.getDouble(2);
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("position is not valid"));
+ }
+ try {
+ doubleColumn.subColumn(3);
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+ }
+ }
+
+ @Test
+ public void floatColumnSubColumnTest() {
+ FloatColumnBuilder columnBuilder = new FloatColumnBuilder(null, 10);
+ for (int i = 0; i < 10; i++) {
+ columnBuilder.writeFloat(i);
+ }
+ FloatColumn floatColumn = (FloatColumn) columnBuilder.build();
+ floatColumn = (FloatColumn) floatColumn.subColumn(5);
+ Assert.assertEquals(5, floatColumn.getPositionCount());
+ Assert.assertEquals(5.0, floatColumn.getFloat(0), 0.001);
+ Assert.assertEquals(9.0, floatColumn.getFloat(4), 0.001);
+ try {
+ floatColumn.getFloat(5);
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("position is not valid"));
+ }
+ floatColumn = (FloatColumn) floatColumn.subColumn(3);
+ Assert.assertEquals(2, floatColumn.getPositionCount());
+ Assert.assertEquals(8.0, floatColumn.getFloat(0), 0.001);
+ Assert.assertEquals(9.0, floatColumn.getFloat(1), 0.001);
+ try {
+ floatColumn.getFloat(2);
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("position is not valid"));
+ }
+ try {
+ floatColumn.subColumn(3);
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+ }
+ }
+
+ @Test
+ public void intColumnSubColumnTest() {
+ IntColumnBuilder columnBuilder = new IntColumnBuilder(null, 10);
+ for (int i = 0; i < 10; i++) {
+ columnBuilder.writeInt(i);
+ }
+ IntColumn intColumn = (IntColumn) columnBuilder.build();
+ intColumn = (IntColumn) intColumn.subColumn(5);
+ Assert.assertEquals(5, intColumn.getPositionCount());
+ Assert.assertEquals(5, intColumn.getInt(0));
+ Assert.assertEquals(9, intColumn.getInt(4));
+ try {
+ intColumn.getInt(5);
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("position is not valid"));
+ }
+ intColumn = (IntColumn) intColumn.subColumn(3);
+ Assert.assertEquals(2, intColumn.getPositionCount());
+ Assert.assertEquals(8, intColumn.getInt(0));
+ Assert.assertEquals(9, intColumn.getInt(1));
+ try {
+ intColumn.getInt(2);
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("position is not valid"));
+ }
+ try {
+ intColumn.subColumn(3);
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+ }
+ }
+
+ @Test
+ public void longColumnSubColumnTest() {
+ LongColumnBuilder columnBuilder = new LongColumnBuilder(null, 10);
+ for (int i = 0; i < 10; i++) {
+ columnBuilder.writeLong(i);
+ }
+ LongColumn longColumn = (LongColumn) columnBuilder.build();
+ longColumn = (LongColumn) longColumn.subColumn(5);
+ Assert.assertEquals(5, longColumn.getPositionCount());
+ Assert.assertEquals(5, longColumn.getLong(0));
+ Assert.assertEquals(9, longColumn.getLong(4));
+ try {
+ longColumn.getLong(5);
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("position is not valid"));
+ }
+ longColumn = (LongColumn) longColumn.subColumn(3);
+ Assert.assertEquals(2, longColumn.getPositionCount());
+ Assert.assertEquals(8, longColumn.getLong(0));
+ Assert.assertEquals(9, longColumn.getLong(1));
+ try {
+ longColumn.getLong(2);
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("position is not valid"));
+ }
+ try {
+ longColumn.subColumn(3);
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+ }
+ }
+
+ @Test
+ public void runLengthEncodedColumnSubColumnTest() {
+ LongColumnBuilder longColumnBuilder = new LongColumnBuilder(null, 1);
+ longColumnBuilder.writeLong(1);
+ RunLengthEncodedColumn column = new RunLengthEncodedColumn(longColumnBuilder.build(), 10);
+ column = (RunLengthEncodedColumn) column.subColumn(5);
+ Assert.assertEquals(5, column.getPositionCount());
+ Assert.assertEquals(1, column.getLong(0));
+ Assert.assertEquals(1, column.getLong(4));
+ try {
+ column.getLong(5);
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("position is not valid"));
+ }
+ column = (RunLengthEncodedColumn) column.subColumn(3);
+ Assert.assertEquals(2, column.getPositionCount());
+ Assert.assertEquals(1, column.getLong(0));
+ Assert.assertEquals(1, column.getLong(1));
+ try {
+ column.getLong(2);
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("position is not valid"));
+ }
+ try {
+ column.subColumn(3);
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+ }
+ }
+}