You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/01 08:56:54 UTC
[iotdb] 05/09: implement part accumulator
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch aggregator
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1065953784e6c2ed8aabf10ef255f2a0efb05394
Author: Alima777 <wx...@gmail.com>
AuthorDate: Fri Apr 29 17:00:09 2022 +0800
implement part accumulator
---
.../db/mpp/operator/aggregation/Accumulator.java | 27 +++-
...regatorFactory.java => AccumulatorFactory.java} | 26 +++-
.../db/mpp/operator/aggregation/Aggregator.java | 18 +--
.../mpp/operator/aggregation/AvgAccumulator.java | 76 +++++++++--
.../mpp/operator/aggregation/CountAccumulator.java | 20 ++-
.../operator/aggregation/ExtremeAccumulator.java | 142 +++++++++++++++++++++
...atorFactory.java => FirstValueAccumulator.java} | 5 +-
...gatorFactory.java => LastValueAccumulator.java} | 5 +-
...regatorFactory.java => MaxTimeAccumulator.java} | 5 +-
...ntAccumulator.java => MaxValueAccumulator.java} | 52 ++++++--
...regatorFactory.java => MinTimeAccumulator.java} | 5 +-
...ntAccumulator.java => MinValueAccumulator.java} | 54 ++++++--
.../{CountAccumulator.java => SumAccumulator.java} | 70 ++++++++--
.../source/SeriesAggregateScanOperator.java | 26 ++--
.../plan/parameter/AggregationDescriptor.java | 4 +
.../operator/SeriesAggregateScanOperatorTest.java | 45 ++++---
.../apache/iotdb/tsfile/utils/TsPrimitiveType.java | 139 +++++++++++++++++++-
17 files changed, 606 insertions(+), 113 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java
index f6f268aad1..5833a75959 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.operator.aggregation;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -25,20 +26,44 @@ import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
public interface Accumulator {
- // Column should be like: | Time | Value |
+ /** Column should be like: | Time | Value | */
void addInput(Column[] column, TimeRange timeRange);
+ /**
+ * For aggregation function like COUNT, SUM, partialResult should be single; But for AVG,
+ * last_value, it should be double column with dictionary order.
+ */
void addIntermediate(Column[] partialResult);
+ /**
+ * This method can only be used in seriesAggregateScanOperator, it will use different statistics
+ * based on the type of Accumulator.
+ */
void addStatistics(Statistics statistics);
+ /**
+ * Attention: setFinal should be invoked only once, and addInput() and addIntermediate() are not
+ * allowed again.
+ */
void setFinal(Column finalResult);
+ /**
+ * For aggregation function like COUNT, SUM, partialResult should be single, so its output column
+ * is single too; But for AVG, last_value, it should be double column with dictionary order.
+ */
void outputIntermediate(ColumnBuilder[] tsBlockBuilder);
+ /** Final result is single column for any aggregation function. */
void outputFinal(ColumnBuilder tsBlockBuilder);
void reset();
+ /**
+ * For first_value or last_value in decreasing order, we can get final result by the first record.
+ */
boolean hasFinalResult();
+
+ TSDataType[] getIntermediateType();
+
+ TSDataType getFinalType();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java
similarity index 57%
copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java
index fb49e8c8c1..1eb421c4e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java
@@ -19,4 +19,28 @@
package org.apache.iotdb.db.mpp.operator.aggregation;
-public class AggregatorFactory {}
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class AccumulatorFactory {
+
+ public static Accumulator createAccumulator(
+ AggregationType aggregationType, TSDataType tsDataType) {
+ switch (aggregationType) {
+ case COUNT:
+ return new CountAccumulator();
+ case AVG:
+ return new AvgAccumulator(tsDataType);
+ case SUM:
+ case EXTREME:
+ case MAX_TIME:
+ case MIN_TIME:
+ case MAX_VALUE:
+ case MIN_VALUE:
+ case LAST_VALUE:
+ case FIRST_VALUE:
+ default:
+ return null;
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
index 8a05f460c5..a808c734bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
@@ -36,22 +36,14 @@ public class Aggregator {
// In some intermediate result input, inputLocation[] should include two columns
private final List<InputLocation[]> inputLocationList;
private final AggregationStep step;
- private final TSDataType intermediateType;
- private final TSDataType finalType;
- private TimeRange timeRange;
+ private TimeRange timeRange = new TimeRange(0, Long.MAX_VALUE);
public Aggregator(
- Accumulator accumulator,
- AggregationStep step,
- List<InputLocation[]> inputLocationList,
- TSDataType intermediateType,
- TSDataType finalType) {
+ Accumulator accumulator, AggregationStep step, List<InputLocation[]> inputLocationList) {
this.accumulator = accumulator;
this.step = step;
this.inputLocationList = inputLocationList;
- this.intermediateType = intermediateType;
- this.finalType = finalType;
}
// Used for SeriesAggregateScanOperator
@@ -96,11 +88,11 @@ public class Aggregator {
accumulator.addStatistics(statistics);
}
- public TSDataType getOutputType() {
+ public TSDataType[] getOutputType() {
if (step.isOutputPartial()) {
- return intermediateType;
+ return new TSDataType[] {accumulator.getFinalType()};
} else {
- return finalType;
+ return accumulator.getIntermediateType();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AvgAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AvgAccumulator.java
index dc993a65c5..0898eefcb4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AvgAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AvgAccumulator.java
@@ -21,10 +21,12 @@ package org.apache.iotdb.db.mpp.operator.aggregation;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
public class AvgAccumulator implements Accumulator {
@@ -32,23 +34,60 @@ public class AvgAccumulator implements Accumulator {
private long countValue;
private double sumValue;
+ public AvgAccumulator(TSDataType seriesDataType) {
+ this.seriesDataType = seriesDataType;
+ }
+
@Override
- public void addInput(Column[] column, TimeRange timeRange) {}
+ public void addInput(Column[] column, TimeRange timeRange) {
+ TimeColumn timeColumn = (TimeColumn) column[0];
+ for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+ long curTime = timeColumn.getLong(i);
+ if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+ break;
+ }
+ countValue++;
+ updateSumValue(column[1].getObject(i));
+ }
+ }
+ // partialResult should be like: | countValue1 | sumValue1 |
@Override
- public void addIntermediate(Column[] partialResult) {}
+ public void addIntermediate(Column[] partialResult) {
+ if (partialResult.length != 2) {
+ throw new IllegalArgumentException("partialResult of Avg should be 2");
+ }
+ countValue += partialResult[0].getLong(0);
+ updateSumValue(partialResult[1].getObject(0));
+ }
@Override
- public void addStatistics(Statistics statistics) {}
+ public void addStatistics(Statistics statistics) {
+ countValue += statistics.getCount();
+ if (statistics instanceof IntegerStatistics) {
+ sumValue += statistics.getSumLongValue();
+ } else {
+ sumValue += statistics.getSumDoubleValue();
+ }
+ }
+ // Set sumValue to finalResult and keep countValue equals to 1
@Override
- public void setFinal(Column finalResult) {}
+ public void setFinal(Column finalResult) {
+ reset();
+ updateSumValue(finalResult.getObject(0));
+ }
@Override
- public void outputIntermediate(ColumnBuilder[] tsBlockBuilder) {}
+ public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+ columnBuilders[0].writeLong(countValue);
+ columnBuilders[1].writeDouble(sumValue);
+ }
@Override
- public void outputFinal(ColumnBuilder tsBlockBuilder) {}
+ public void outputFinal(ColumnBuilder columnBuilder) {
+ columnBuilder.writeDouble(sumValue / countValue);
+ }
@Override
public void reset() {
@@ -61,26 +100,35 @@ public class AvgAccumulator implements Accumulator {
return false;
}
- private void updateAvg(TSDataType type, Object sumVal) throws UnSupportedDataTypeException {
- double val;
- switch (type) {
+ @Override
+ public TSDataType[] getIntermediateType() {
+ return new TSDataType[] {TSDataType.INT64, TSDataType.DOUBLE};
+ }
+
+ @Override
+ public TSDataType getFinalType() {
+ return TSDataType.DOUBLE;
+ }
+
+ private void updateSumValue(Object sumVal) throws UnSupportedDataTypeException {
+ switch (seriesDataType) {
case INT32:
- val = (int) sumVal;
+ sumValue += (int) sumVal;
break;
case INT64:
- val = (long) sumVal;
+ sumValue = (long) sumVal;
break;
case FLOAT:
- val = (float) sumVal;
+ sumValue = (float) sumVal;
break;
case DOUBLE:
- val = (double) sumVal;
+ sumValue = (double) sumVal;
break;
case TEXT:
case BOOLEAN:
default:
throw new UnSupportedDataTypeException(
- String.format("Unsupported data type in aggregation AVG : %s", type));
+ String.format("Unsupported data type in aggregation AVG : %s", seriesDataType));
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
index db266b55e4..84dae3f986 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.operator.aggregation;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -29,6 +30,8 @@ public class CountAccumulator implements Accumulator {
private long countValue = 0;
+ public CountAccumulator() {}
+
// Column should be like: | Time | Value |
@Override
public void addInput(Column[] column, TimeRange timeRange) {
@@ -42,12 +45,13 @@ public class CountAccumulator implements Accumulator {
}
}
- // partialResult should be like: | partialCountValue1 | partialCountValue2 |
+ // partialResult should be like: | partialCountValue1 |
@Override
public void addIntermediate(Column[] partialResult) {
- for (int i = 0; i < partialResult.length; i++) {
- countValue += partialResult[i].getLong(0);
+ if (partialResult.length != 1) {
+ throw new IllegalArgumentException("partialResult of Count should be 1");
}
+ countValue += partialResult[0].getLong(0);
}
@Override
@@ -81,4 +85,14 @@ public class CountAccumulator implements Accumulator {
public boolean hasFinalResult() {
return false;
}
+
+ @Override
+ public TSDataType[] getIntermediateType() {
+ return new TSDataType[] {TSDataType.INT64};
+ }
+
+ @Override
+ public TSDataType getFinalType() {
+ return TSDataType.INT64;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/ExtremeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/ExtremeAccumulator.java
new file mode 100644
index 0000000000..599feec646
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/ExtremeAccumulator.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.operator.aggregation;
+
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+public class ExtremeAccumulator implements Accumulator {
+
+ private TsPrimitiveType extremeResult;
+ private boolean hasCandidateResult;
+
+ public ExtremeAccumulator(TSDataType seriesDataType) {
+ this.extremeResult = TsPrimitiveType.getByType(seriesDataType);
+ }
+
+ @Override
+ public void addInput(Column[] column, TimeRange timeRange) {
+ TimeColumn timeColumn = (TimeColumn) column[0];
+ for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+ long curTime = timeColumn.getLong(i);
+ if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+ break;
+ }
+ updateResult((Comparable<Object>) column[1].getObject(i));
+ }
+ }
+
+ @Override
+ public void addIntermediate(Column[] partialResult) {
+ if (partialResult.length != 1) {
+ throw new IllegalArgumentException("partialResult of ExtremeValue should be 1");
+ }
+ updateResult((Comparable<Object>) partialResult[0].getObject(0));
+ }
+
+ @Override
+ public void addStatistics(Statistics statistics) {
+ Comparable<Object> maxVal = (Comparable<Object>) statistics.getMaxValue();
+ Comparable<Object> minVal = (Comparable<Object>) statistics.getMinValue();
+
+ Comparable<Object> absMaxVal = (Comparable<Object>) getAbsValue(maxVal);
+ Comparable<Object> absMinVal = (Comparable<Object>) getAbsValue(minVal);
+
+ Comparable<Object> extVal = absMaxVal.compareTo(absMinVal) >= 0 ? maxVal : minVal;
+ updateResult(extVal);
+ }
+
+ @Override
+ public void setFinal(Column finalResult) {
+ extremeResult.setObject(finalResult.getObject(0));
+ }
+
+ // columnBuilder should be single in ExtremeAccumulator
+ @Override
+ public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+ columnBuilders[0].writeObject(extremeResult.getValue());
+ }
+
+ @Override
+ public void outputFinal(ColumnBuilder columnBuilder) {
+ columnBuilder.writeObject(extremeResult.getValue());
+ }
+
+ @Override
+ public void reset() {
+ hasCandidateResult = false;
+ extremeResult.reset();
+ }
+
+ @Override
+ public boolean hasFinalResult() {
+ return false;
+ }
+
+ @Override
+ public TSDataType[] getIntermediateType() {
+ return new TSDataType[] {extremeResult.getDataType()};
+ }
+
+ @Override
+ public TSDataType getFinalType() {
+ return extremeResult.getDataType();
+ }
+
+ public Object getAbsValue(Object v) {
+ switch (extremeResult.getDataType()) {
+ case DOUBLE:
+ return Math.abs((Double) v);
+ case FLOAT:
+ return Math.abs((Float) v);
+ case INT32:
+ return Math.abs((Integer) v);
+ case INT64:
+ return Math.abs((Long) v);
+ default:
+ throw new UnSupportedDataTypeException(String.valueOf(extremeResult.getDataType()));
+ }
+ }
+
+ private void updateResult(Comparable<Object> extVal) {
+ if (extVal == null) {
+ return;
+ }
+
+ Comparable<Object> absExtVal = (Comparable<Object>) getAbsValue(extVal);
+ Comparable<Object> candidateResult = (Comparable<Object>) extremeResult.getValue();
+ Comparable<Object> absCandidateResult =
+ (Comparable<Object>) getAbsValue(extremeResult.getValue());
+
+ if (!hasCandidateResult
+ || (absExtVal.compareTo(absCandidateResult) > 0
+ || (absExtVal.compareTo(absCandidateResult) == 0
+ && extVal.compareTo(candidateResult) > 0))) {
+ hasCandidateResult = true;
+ extremeResult.setObject(extVal);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
similarity index 89%
copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
index fb49e8c8c1..49ad6a5133 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
@@ -17,6 +17,5 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.operator.aggregation;
-
-public class AggregatorFactory {}
+package org.apache.iotdb.db.mpp.operator.aggregation;public class FirstValueAccumulator {
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
similarity index 89%
copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
index fb49e8c8c1..97183aae52 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
@@ -17,6 +17,5 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.operator.aggregation;
-
-public class AggregatorFactory {}
+package org.apache.iotdb.db.mpp.operator.aggregation;public class LastValueAccumulator {
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java
similarity index 90%
copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java
index fb49e8c8c1..c5d03f8442 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java
@@ -17,6 +17,5 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.operator.aggregation;
-
-public class AggregatorFactory {}
+package org.apache.iotdb.db.mpp.operator.aggregation;public class MaxTimeAccumulator {
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxValueAccumulator.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxValueAccumulator.java
index db266b55e4..bb3f1d880a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxValueAccumulator.java
@@ -19,15 +19,22 @@
package org.apache.iotdb.db.mpp.operator.aggregation;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
-public class CountAccumulator implements Accumulator {
+public class MaxValueAccumulator implements Accumulator {
- private long countValue = 0;
+ private TsPrimitiveType maxResult;
+ private boolean hasCandidateResult;
+
+ public MaxValueAccumulator(TSDataType seriesDataType) {
+ this.maxResult = TsPrimitiveType.getByType(seriesDataType);
+ }
// Column should be like: | Time | Value |
@Override
@@ -38,47 +45,70 @@ public class CountAccumulator implements Accumulator {
if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
break;
}
- countValue++;
+ updateResult((Comparable<Object>) column[1].getObject(i));
}
}
- // partialResult should be like: | partialCountValue1 | partialCountValue2 |
+ // partialResult should be like: | partialMaxValue1 |
@Override
public void addIntermediate(Column[] partialResult) {
- for (int i = 0; i < partialResult.length; i++) {
- countValue += partialResult[i].getLong(0);
+ if (partialResult.length != 1) {
+ throw new IllegalArgumentException("partialResult of MaxValue should be 1");
}
+ updateResult((Comparable<Object>) partialResult[0].getObject(0));
}
@Override
public void addStatistics(Statistics statistics) {
- countValue += statistics.getCount();
+ Comparable<Object> maxValue = (Comparable<Object>) statistics.getMaxValue();
+ updateResult(maxValue);
}
// finalResult should be single column, like: | finalCountValue |
@Override
public void setFinal(Column finalResult) {
- countValue = finalResult.getLong(0);
+ maxResult.setObject(finalResult.getObject(0));
}
// columnBuilder should be single in countAccumulator
@Override
public void outputIntermediate(ColumnBuilder[] columnBuilders) {
- columnBuilders[0].writeLong(countValue);
+ columnBuilders[0].writeObject(maxResult.getValue());
}
@Override
public void outputFinal(ColumnBuilder columnBuilder) {
- columnBuilder.writeLong(countValue);
+ columnBuilder.writeObject(maxResult.getValue());
}
@Override
public void reset() {
- this.countValue = 0;
+ hasCandidateResult = false;
+ this.maxResult.reset();
}
@Override
public boolean hasFinalResult() {
return false;
}
+
+ @Override
+ public TSDataType[] getIntermediateType() {
+ return new TSDataType[] {maxResult.getDataType()};
+ }
+
+ @Override
+ public TSDataType getFinalType() {
+ return maxResult.getDataType();
+ }
+
+ private void updateResult(Comparable<Object> minVal) {
+ if (minVal == null) {
+ return;
+ }
+ if (!hasCandidateResult || minVal.compareTo(maxResult.getValue()) > 0) {
+ hasCandidateResult = true;
+ maxResult.setObject(minVal);
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
similarity index 90%
rename from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
index fb49e8c8c1..95bf611acf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
@@ -17,6 +17,5 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.operator.aggregation;
-
-public class AggregatorFactory {}
+package org.apache.iotdb.db.mpp.operator.aggregation;public class MinTimeAccumulator {
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinValueAccumulator.java
similarity index 58%
copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinValueAccumulator.java
index db266b55e4..97f46724ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinValueAccumulator.java
@@ -19,15 +19,22 @@
package org.apache.iotdb.db.mpp.operator.aggregation;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
-public class CountAccumulator implements Accumulator {
+public class MinValueAccumulator implements Accumulator {
- private long countValue = 0;
+ private TsPrimitiveType minResult;
+ private boolean hasCandidateResult;
+
+ public MinValueAccumulator(TSDataType seriesDataType) {
+ this.minResult = TsPrimitiveType.getByType(seriesDataType);
+ }
// Column should be like: | Time | Value |
@Override
@@ -38,47 +45,70 @@ public class CountAccumulator implements Accumulator {
if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
break;
}
- countValue++;
+ updateResult((Comparable<Object>) column[1].getObject(i));
}
}
- // partialResult should be like: | partialCountValue1 | partialCountValue2 |
+ // partialResult should be like: | partialMinValue1 |
@Override
public void addIntermediate(Column[] partialResult) {
- for (int i = 0; i < partialResult.length; i++) {
- countValue += partialResult[i].getLong(0);
+ if (partialResult.length != 1) {
+ throw new IllegalArgumentException("partialResult of MinValue should be 1");
}
+ updateResult((Comparable<Object>) partialResult[0].getObject(0));
}
@Override
public void addStatistics(Statistics statistics) {
- countValue += statistics.getCount();
+ Comparable<Object> minVal = (Comparable<Object>) statistics.getMinValue();
+ updateResult(minVal);
}
// finalResult should be single column, like: | finalCountValue |
@Override
public void setFinal(Column finalResult) {
- countValue = finalResult.getLong(0);
+ minResult.setObject(finalResult.getObject(0));
}
- // columnBuilder should be single in countAccumulator
+ // columnBuilder should be single in MinValueAccumulator
@Override
public void outputIntermediate(ColumnBuilder[] columnBuilders) {
- columnBuilders[0].writeLong(countValue);
+ columnBuilders[0].writeObject(minResult.getValue());
}
@Override
public void outputFinal(ColumnBuilder columnBuilder) {
- columnBuilder.writeLong(countValue);
+ columnBuilder.writeObject(minResult.getValue());
}
@Override
public void reset() {
- this.countValue = 0;
+ hasCandidateResult = false;
+ this.minResult.reset();
}
@Override
public boolean hasFinalResult() {
return false;
}
+
+ @Override
+ public TSDataType[] getIntermediateType() {
+ return new TSDataType[] {minResult.getDataType()};
+ }
+
+ @Override
+ public TSDataType getFinalType() {
+ return minResult.getDataType();
+ }
+
+ private void updateResult(Comparable<Object> minVal) {
+ if (minVal == null) {
+ return;
+ }
+ if (!hasCandidateResult || minVal.compareTo(minResult.getValue()) < 0) {
+ hasCandidateResult = true;
+ minResult.setObject(minVal);
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/SumAccumulator.java
similarity index 52%
copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/SumAccumulator.java
index db266b55e4..7dd3896b1c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/SumAccumulator.java
@@ -19,15 +19,23 @@
package org.apache.iotdb.db.mpp.operator.aggregation;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
-public class CountAccumulator implements Accumulator {
+public class SumAccumulator implements Accumulator {
- private long countValue = 0;
+ private TSDataType seriesDataType;
+ private double sumValue = 0;
+
+ public SumAccumulator(TSDataType seriesDataType) {
+ this.seriesDataType = seriesDataType;
+ }
// Column should be like: | Time | Value |
@Override
@@ -38,47 +46,85 @@ public class CountAccumulator implements Accumulator {
if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
break;
}
- countValue++;
+ updateSumValue(column[1].getObject(i));
}
}
- // partialResult should be like: | partialCountValue1 | partialCountValue2 |
+ // partialResult should be like: | partialSumValue1 |
@Override
public void addIntermediate(Column[] partialResult) {
- for (int i = 0; i < partialResult.length; i++) {
- countValue += partialResult[i].getLong(0);
+ if (partialResult.length != 1) {
+ throw new IllegalArgumentException("partialResult of Sum should be 1");
}
+ updateSumValue(partialResult[0].getObject(0));
}
@Override
public void addStatistics(Statistics statistics) {
- countValue += statistics.getCount();
+ if (statistics instanceof IntegerStatistics) {
+ sumValue += statistics.getSumLongValue();
+ } else {
+ sumValue += statistics.getSumDoubleValue();
+ }
}
- // finalResult should be single column, like: | finalCountValue |
+ // finalResult should be single column, like: | finalSumValue |
@Override
public void setFinal(Column finalResult) {
- countValue = finalResult.getLong(0);
+ reset();
+ updateSumValue(finalResult.getObject(0));
}
// columnBuilder should be single in countAccumulator
@Override
public void outputIntermediate(ColumnBuilder[] columnBuilders) {
- columnBuilders[0].writeLong(countValue);
+ columnBuilders[0].writeDouble(sumValue);
}
@Override
public void outputFinal(ColumnBuilder columnBuilder) {
- columnBuilder.writeLong(countValue);
+ columnBuilder.writeDouble(sumValue);
}
@Override
public void reset() {
- this.countValue = 0;
+ this.sumValue = 0;
}
@Override
public boolean hasFinalResult() {
return false;
}
+
+ @Override
+ public TSDataType[] getIntermediateType() {
+ return new TSDataType[] {TSDataType.DOUBLE};
+ }
+
+ @Override
+ public TSDataType getFinalType() {
+ return TSDataType.DOUBLE;
+ }
+
+ private void updateSumValue(Object sumVal) throws UnSupportedDataTypeException {
+ switch (seriesDataType) {
+ case INT32:
+ sumValue += (int) sumVal;
+ break;
+ case INT64:
+ sumValue = (long) sumVal;
+ break;
+ case FLOAT:
+ sumValue = (float) sumVal;
+ break;
+ case DOUBLE:
+ sumValue = (double) sumVal;
+ break;
+ case TEXT:
+ case BOOLEAN:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in aggregation AVG : %s", seriesDataType));
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
index 8966b1f57b..b679febcca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.utils.timerangeiterator.SingleTimeWindowIterator;
import org.apache.iotdb.db.utils.timerangeiterator.TimeRangeIteratorFactory;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -39,9 +40,10 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Set;
-import java.util.stream.Collectors;
/**
* This operator is responsible to do the aggregation calculation for one series based on global
@@ -94,9 +96,11 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
null,
ascending);
this.aggregators = aggregators;
- tsBlockBuilder =
- new TsBlockBuilder(
- aggregators.stream().map(Aggregator::getOutputType).collect(Collectors.toList()));
+ List<TSDataType> dataTypes = new ArrayList<>();
+ for (Aggregator aggregator : aggregators) {
+ dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
+ }
+ tsBlockBuilder = new TsBlockBuilder(dataTypes);
this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter);
}
@@ -216,10 +220,14 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
// Use start time of current time range as time column
timeColumnBuilder.writeLong(curTimeRange.getMin());
ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
- for (int i = 0; i < aggregators.size(); i++) {
- ColumnBuilder[] columnBuilder = new ColumnBuilder[1];
- columnBuilder[0] = columnBuilders[i];
- aggregators.get(i).outputResult(columnBuilder);
+ int columnIndex = 0;
+ for (Aggregator aggregator : aggregators) {
+ ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length];
+ columnBuilder[0] = columnBuilders[columnIndex++];
+ if (columnBuilder.length > 1) {
+ columnBuilder[1] = columnBuilders[columnIndex++];
+ }
+ aggregator.outputResult(columnBuilder);
}
tsBlockBuilder.declarePosition();
resultTsBlock = tsBlockBuilder.build();
@@ -261,7 +269,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
@SuppressWarnings("squid:S3776")
private void calcFromBatch(TsBlock tsBlock, TimeRange curTimeRange) {
// check if the batchData does not contain points in current interval
- if (!satisfied(tsBlock, curTimeRange)) {
+ if (tsBlock == null || !satisfied(tsBlock, curTimeRange)) {
return;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/AggregationDescriptor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/AggregationDescriptor.java
index 244fbcf7b8..8f3dbff4fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/AggregationDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/AggregationDescriptor.java
@@ -59,6 +59,10 @@ public class AggregationDescriptor {
return aggregationType;
}
+ public AggregationStep getStep() {
+ return step;
+ }
+
public void serialize(ByteBuffer byteBuffer) {
ReadWriteIOUtils.write(aggregationType.ordinal(), byteBuffer);
step.serialize(byteBuffer);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
index bfaced0670..c21314b5d9 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
@@ -30,10 +30,12 @@ import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.operator.aggregation.AccumulatorFactory;
import org.apache.iotdb.db.mpp.operator.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.operator.source.SeriesAggregateScanOperator;
import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
@@ -89,7 +91,14 @@ public class SeriesAggregateScanOperatorTest {
public void testAggregationWithoutTimeFilter() throws IllegalPathException {
SeriesAggregateScanOperator seriesAggregateScanOperator =
initSeriesAggregateScanOperator(
- Collections.singletonList(AggregationType.COUNT), null, true, null);
+ Collections.singletonList(
+ new Aggregator(
+ AccumulatorFactory.createAccumulator(AggregationType.COUNT, TSDataType.INT32),
+ AggregationStep.SINGLE,
+ null)),
+ null,
+ true,
+ null);
int count = 0;
while (seriesAggregateScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -105,7 +114,7 @@ public class SeriesAggregateScanOperatorTest {
aggregationTypes.add(AggregationType.COUNT);
aggregationTypes.add(AggregationType.SUM);
SeriesAggregateScanOperator seriesAggregateScanOperator =
- initSeriesAggregateScanOperator(aggregationTypes, null, true, null);
+ initSeriesAggregateScanOperator(null, null, true, null);
int count = 0;
while (seriesAggregateScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -126,7 +135,7 @@ public class SeriesAggregateScanOperatorTest {
aggregationTypes.add(AggregationType.MAX_VALUE);
aggregationTypes.add(AggregationType.MIN_VALUE);
SeriesAggregateScanOperator seriesAggregateScanOperator =
- initSeriesAggregateScanOperator(aggregationTypes, null, true, null);
+ initSeriesAggregateScanOperator(null, null, true, null);
int count = 0;
while (seriesAggregateScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -145,8 +154,7 @@ public class SeriesAggregateScanOperatorTest {
public void testAggregationWithTimeFilter1() throws IllegalPathException {
Filter timeFilter = TimeFilter.gtEq(120);
SeriesAggregateScanOperator seriesAggregateScanOperator =
- initSeriesAggregateScanOperator(
- Collections.singletonList(AggregationType.COUNT), timeFilter, true, null);
+ initSeriesAggregateScanOperator(null, timeFilter, true, null);
int count = 0;
while (seriesAggregateScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -160,8 +168,7 @@ public class SeriesAggregateScanOperatorTest {
public void testAggregationWithTimeFilter2() throws IllegalPathException {
Filter timeFilter = TimeFilter.ltEq(379);
SeriesAggregateScanOperator seriesAggregateScanOperator =
- initSeriesAggregateScanOperator(
- Collections.singletonList(AggregationType.COUNT), timeFilter, true, null);
+ initSeriesAggregateScanOperator(null, timeFilter, true, null);
int count = 0;
while (seriesAggregateScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -175,8 +182,7 @@ public class SeriesAggregateScanOperatorTest {
public void testAggregationWithTimeFilter3() throws IllegalPathException {
Filter timeFilter = new AndFilter(TimeFilter.gtEq(100), TimeFilter.ltEq(399));
SeriesAggregateScanOperator seriesAggregateScanOperator =
- initSeriesAggregateScanOperator(
- Collections.singletonList(AggregationType.COUNT), timeFilter, true, null);
+ initSeriesAggregateScanOperator(null, timeFilter, true, null);
int count = 0;
while (seriesAggregateScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -197,7 +203,7 @@ public class SeriesAggregateScanOperatorTest {
aggregationTypes.add(AggregationType.MIN_VALUE);
Filter timeFilter = new AndFilter(TimeFilter.gtEq(100), TimeFilter.ltEq(399));
SeriesAggregateScanOperator seriesAggregateScanOperator =
- initSeriesAggregateScanOperator(aggregationTypes, timeFilter, true, null);
+ initSeriesAggregateScanOperator(null, timeFilter, true, null);
int count = 0;
while (seriesAggregateScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -217,8 +223,7 @@ public class SeriesAggregateScanOperatorTest {
int[] result = new int[] {100, 100, 100, 100};
GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
SeriesAggregateScanOperator seriesAggregateScanOperator =
- initSeriesAggregateScanOperator(
- Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter);
+ initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter);
int count = 0;
while (seriesAggregateScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -235,11 +240,7 @@ public class SeriesAggregateScanOperatorTest {
Filter timeFilter = new AndFilter(TimeFilter.gtEq(120), TimeFilter.ltEq(379));
GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
SeriesAggregateScanOperator seriesAggregateScanOperator =
- initSeriesAggregateScanOperator(
- Collections.singletonList(AggregationType.COUNT),
- timeFilter,
- true,
- groupByTimeParameter);
+ initSeriesAggregateScanOperator(null, timeFilter, true, groupByTimeParameter);
int count = 0;
while (seriesAggregateScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -266,7 +267,7 @@ public class SeriesAggregateScanOperatorTest {
aggregationTypes.add(AggregationType.MIN_VALUE);
GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
SeriesAggregateScanOperator seriesAggregateScanOperator =
- initSeriesAggregateScanOperator(aggregationTypes, null, true, groupByTimeParameter);
+ initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter);
int count = 0;
while (seriesAggregateScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -285,8 +286,7 @@ public class SeriesAggregateScanOperatorTest {
int[] result = new int[] {50, 50, 50, 50, 50, 50, 50, 50};
GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 50, true);
SeriesAggregateScanOperator seriesAggregateScanOperator =
- initSeriesAggregateScanOperator(
- Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter);
+ initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter);
int count = 0;
while (seriesAggregateScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -303,8 +303,7 @@ public class SeriesAggregateScanOperatorTest {
int[] result = new int[] {20, 10, 20, 10, 20, 10, 20, 10, 20, 9};
GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 149, 50, 30, true);
SeriesAggregateScanOperator seriesAggregateScanOperator =
- initSeriesAggregateScanOperator(
- Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter);
+ initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter);
int count = 0;
while (seriesAggregateScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -332,7 +331,7 @@ public class SeriesAggregateScanOperatorTest {
aggregationTypes.add(AggregationType.MIN_VALUE);
GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 149, 50, 30, true);
SeriesAggregateScanOperator seriesAggregateScanOperator =
- initSeriesAggregateScanOperator(aggregationTypes, null, true, groupByTimeParameter);
+ initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter);
int count = 0;
while (seriesAggregateScanOperator.hasNext()) {
TsBlock resultTsBlock = seriesAggregateScanOperator.next();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
index 7bfd218374..8251fe9438 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
@@ -29,8 +29,34 @@ public abstract class TsPrimitiveType implements Serializable {
/**
* get tsPrimitiveType by resultDataType.
*
- * @param dataType -given TsDataType
- * @param v -
+ * @param dataType given TsDataType
+ */
+ public static TsPrimitiveType getByType(TSDataType dataType) {
+ switch (dataType) {
+ case BOOLEAN:
+ return new TsPrimitiveType.TsBoolean();
+ case INT32:
+ return new TsPrimitiveType.TsInt();
+ case INT64:
+ return new TsPrimitiveType.TsLong();
+ case FLOAT:
+ return new TsPrimitiveType.TsFloat();
+ case DOUBLE:
+ return new TsPrimitiveType.TsDouble();
+ case TEXT:
+ return new TsPrimitiveType.TsBinary();
+ case VECTOR:
+ return new TsPrimitiveType.TsVector();
+ default:
+ throw new UnSupportedDataTypeException("Unsupported data type:" + dataType);
+ }
+ }
+
+ /**
+ * get tsPrimitiveType by resultDataType and initial value.
+ *
+ * @param dataType given TsDataType
+ * @param v initial value
*/
public static TsPrimitiveType getByType(TSDataType dataType, Object v) {
switch (dataType) {
@@ -109,6 +135,10 @@ public abstract class TsPrimitiveType implements Serializable {
throw new UnsupportedOperationException("setVector() is not supported for current sub-class");
}
+ public abstract void setObject(Object val);
+
+ public abstract void reset();
+
/**
* get the size of one instance of current class.
*
@@ -142,6 +172,8 @@ public abstract class TsPrimitiveType implements Serializable {
private boolean value;
+ public TsBoolean() {}
+
public TsBoolean(boolean value) {
this.value = value;
}
@@ -156,6 +188,19 @@ public abstract class TsPrimitiveType implements Serializable {
this.value = val;
}
+ @Override
+ public void setObject(Object val) {
+ if (val instanceof Binary) {
+ setBinary((Binary) val);
+ }
+ throw new UnSupportedDataTypeException("TsBoolean can only be set Binary value");
+ }
+
+ @Override
+ public void reset() {
+ value = false;
+ }
+
@Override
public int getSize() {
return 4 + 1;
@@ -198,6 +243,8 @@ public abstract class TsPrimitiveType implements Serializable {
private int value;
+ public TsInt() {}
+
public TsInt(int value) {
this.value = value;
}
@@ -212,6 +259,19 @@ public abstract class TsPrimitiveType implements Serializable {
this.value = val;
}
+ @Override
+ public void setObject(Object val) {
+ if (val instanceof Integer) {
+ setInt((Integer) val);
+ }
+ throw new UnSupportedDataTypeException("TsInt can only be set Integer value");
+ }
+
+ @Override
+ public void reset() {
+ value = 0;
+ }
+
@Override
public int getSize() {
return 4 + 4;
@@ -254,6 +314,8 @@ public abstract class TsPrimitiveType implements Serializable {
private long value;
+ public TsLong() {}
+
public TsLong(long value) {
this.value = value;
}
@@ -268,6 +330,19 @@ public abstract class TsPrimitiveType implements Serializable {
this.value = val;
}
+ @Override
+ public void setObject(Object val) {
+ if (val instanceof Long) {
+ setLong((Long) val);
+ }
+ throw new UnSupportedDataTypeException("TsLong can only be set Long value");
+ }
+
+ @Override
+ public void reset() {
+ value = 0;
+ }
+
@Override
public int getSize() {
return 4 + 8;
@@ -310,6 +385,8 @@ public abstract class TsPrimitiveType implements Serializable {
private float value;
+ public TsFloat() {}
+
public TsFloat(float value) {
this.value = value;
}
@@ -324,6 +401,19 @@ public abstract class TsPrimitiveType implements Serializable {
this.value = val;
}
+ @Override
+ public void setObject(Object val) {
+ if (val instanceof Float) {
+ setFloat((Float) val);
+ }
+ throw new UnSupportedDataTypeException("TsFloat can only be set float value");
+ }
+
+ @Override
+ public void reset() {
+ value = 0;
+ }
+
@Override
public int getSize() {
return 4 + 4;
@@ -366,6 +456,8 @@ public abstract class TsPrimitiveType implements Serializable {
private double value;
+ public TsDouble() {}
+
public TsDouble(double value) {
this.value = value;
}
@@ -380,6 +472,19 @@ public abstract class TsPrimitiveType implements Serializable {
this.value = val;
}
+ @Override
+ public void setObject(Object val) {
+ if (val instanceof Double) {
+ setDouble((Double) val);
+ }
+ throw new UnSupportedDataTypeException("TsDouble can only be set Double value");
+ }
+
+ @Override
+ public void reset() {
+ value = 0.0;
+ }
+
@Override
public int getSize() {
return 4 + 8;
@@ -422,6 +527,8 @@ public abstract class TsPrimitiveType implements Serializable {
private Binary value;
+ public TsBinary() {}
+
public TsBinary(Binary value) {
this.value = value;
}
@@ -436,6 +543,19 @@ public abstract class TsPrimitiveType implements Serializable {
this.value = val;
}
+ @Override
+ public void setObject(Object val) {
+ if (val instanceof Binary) {
+ setBinary((Binary) val);
+ }
+ throw new UnSupportedDataTypeException("TsBinary can only be set Binary value");
+ }
+
+ @Override
+ public void reset() {
+ value = null;
+ }
+
@Override
public int getSize() {
return 4 + 4 + value.getLength();
@@ -478,6 +598,8 @@ public abstract class TsPrimitiveType implements Serializable {
private TsPrimitiveType[] values;
+ public TsVector() {}
+
public TsVector(TsPrimitiveType[] values) {
this.values = values;
}
@@ -492,6 +614,19 @@ public abstract class TsPrimitiveType implements Serializable {
this.values = vals;
}
+ @Override
+ public void setObject(Object val) {
+ if (val instanceof TsPrimitiveType[]) {
+ setVector((TsPrimitiveType[]) val);
+ }
+ throw new UnSupportedDataTypeException("TsVector can only be set TsPrimitiveType[] value");
+ }
+
+ @Override
+ public void reset() {
+ values = null;
+ }
+
@Override
public int getSize() {
int size = 0;