You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/12/23 11:46:15 UTC
[iotdb] branch rel/0.12 updated: [To rel/0.12] [IOTDB-2192] Support extreme function (#4626)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new c5531cb [To rel/0.12] [IOTDB-2192] Support extreme function (#4626)
c5531cb is described below
commit c5531cb7f8c418ce90994cc21db6036fddac0b31
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Thu Dec 23 19:45:33 2021 +0800
[To rel/0.12] [IOTDB-2192] Support extreme function (#4626)
---
.../antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4 | 5 +
.../query/groupby/MergeGroupByExecutorTest.java | 8 +-
.../query/groupby/RemoteGroupByExecutorTest.java | 8 +-
.../cluster/server/member/DataGroupMemberTest.java | 2 +-
docs/UserGuide/Appendix/SQL-Reference.md | 11 ++
.../DML-Data-Manipulation-Language.md | 4 +-
docs/zh/UserGuide/Appendix/SQL-Reference.md | 8 +
.../DML-Data-Manipulation-Language.md | 4 +-
.../apache/iotdb/db/qp/constant/SQLConstant.java | 13 +-
.../db/query/aggregation/AggregationType.java | 8 +-
.../query/aggregation/impl/ExtremeAggrResult.java | 173 +++++++++++++++++++++
.../db/query/factory/AggregateResultFactory.java | 20 +--
.../org/apache/iotdb/db/constant/TestConstant.java | 4 +
.../aggregation/IoTDBAggregationIT.java | 61 ++++++++
.../aggregation/IoTDBAggregationLargeDataIT.java | 64 ++++++++
.../aggregation/IoTDBAggregationSmallDataIT.java | 40 +++++
.../db/query/aggregation/AggregateResultTest.java | 25 +++
17 files changed, 430 insertions(+), 28 deletions(-)
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4
index 0062431..3e3c11e 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4
@@ -158,6 +158,7 @@ functionName
| FIRST_VALUE
| SUM
| LAST_VALUE
+ | EXTREME
;
functionAsClause
@@ -1143,6 +1144,10 @@ MAX_VALUE
: M A X UNDERLINE V A L U E
;
+EXTREME
+ : E X T R E M E
+ ;
+
AVG
: A V G
;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/MergeGroupByExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/MergeGroupByExecutorTest.java
index aa5bb34..dba649c 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/MergeGroupByExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/MergeGroupByExecutorTest.java
@@ -70,11 +70,11 @@ public class MergeGroupByExecutorTest extends BaseQueryTest {
Object[] answers;
List<AggregateResult> aggregateResults;
- answers = new Object[] {5.0, 2.0, 10.0, 0.0, 4.0, 4.0, 0.0, 4.0, 0.0};
+ answers = new Object[] {5.0, 2.0, 10.0, 0.0, 4.0, 4.0, 0.0, 4.0, 0.0, 4.0};
aggregateResults = groupByExecutor.calcResult(0, 5);
checkAggregations(aggregateResults, answers);
- answers = new Object[] {5.0, 7.0, 35.0, 5.0, 9.0, 9.0, 5.0, 9.0, 5.0};
+ answers = new Object[] {5.0, 7.0, 35.0, 5.0, 9.0, 9.0, 5.0, 9.0, 5.0, 9.0};
aggregateResults = groupByExecutor.calcResult(5, 10);
checkAggregations(aggregateResults, answers);
} finally {
@@ -109,11 +109,11 @@ public class MergeGroupByExecutorTest extends BaseQueryTest {
Object[] answers;
List<AggregateResult> aggregateResults;
- answers = new Object[] {2.0, 3.5, 7.0, 3.0, 4.0, 4.0, 3.0, 4.0, 3.0};
+ answers = new Object[] {2.0, 3.5, 7.0, 3.0, 4.0, 4.0, 3.0, 4.0, 3.0, 4.0};
aggregateResults = groupByExecutor.calcResult(0, 5);
checkAggregations(aggregateResults, answers);
- answers = new Object[] {5.0, 7.0, 35.0, 5.0, 9.0, 9.0, 5.0, 9.0, 5.0};
+ answers = new Object[] {5.0, 7.0, 35.0, 5.0, 9.0, 9.0, 5.0, 9.0, 5.0, 9.0};
aggregateResults = groupByExecutor.calcResult(5, 10);
checkAggregations(aggregateResults, answers);
} finally {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java
index a89e811..5994eb8 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java
@@ -79,11 +79,11 @@ public class RemoteGroupByExecutorTest extends BaseQueryTest {
if (groupByExecutors.size() == 1) {
// a series is only managed by one group
List<AggregateResult> aggregateResults;
- answers = new Object[] {5.0, 2.0, 10.0, 0.0, 4.0, 4.0, 0.0, 4.0, 0.0};
+ answers = new Object[] {5.0, 2.0, 10.0, 0.0, 4.0, 4.0, 0.0, 4.0, 0.0, 4.0};
aggregateResults = groupByExecutor.calcResult(0, 5);
checkAggregations(aggregateResults, answers);
- answers = new Object[] {5.0, 7.0, 35.0, 5.0, 9.0, 9.0, 5.0, 9.0, 5.0};
+ answers = new Object[] {5.0, 7.0, 35.0, 5.0, 9.0, 9.0, 5.0, 9.0, 5.0, 9.0};
aggregateResults = groupByExecutor.calcResult(5, 10);
checkAggregations(aggregateResults, answers);
} else {
@@ -141,11 +141,11 @@ public class RemoteGroupByExecutorTest extends BaseQueryTest {
if (groupByExecutors.size() == 1) {
// a series is only managed by one group
List<AggregateResult> aggregateResults;
- answers = new Object[] {2.0, 3.5, 7.0, 3.0, 4.0, 4.0, 3.0, 4.0, 3.0};
+ answers = new Object[] {2.0, 3.5, 7.0, 3.0, 4.0, 4.0, 3.0, 4.0, 3.0, 4.0};
aggregateResults = groupByExecutor.calcResult(0, 5);
checkAggregations(aggregateResults, answers);
- answers = new Object[] {5.0, 7.0, 35.0, 5.0, 9.0, 9.0, 5.0, 9.0, 5.0};
+ answers = new Object[] {5.0, 7.0, 35.0, 5.0, 9.0, 9.0, 5.0, 9.0, 5.0, 9.0};
aggregateResults = groupByExecutor.calcResult(5, 10);
checkAggregations(aggregateResults, answers);
} else {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index d2a66ad..ceff919 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -1149,7 +1149,7 @@ public class DataGroupMemberTest extends BaseMember {
for (ByteBuffer byteBuffer : byteBuffers) {
aggregateResults.add(AggregateResult.deserializeFrom(byteBuffer));
}
- answers = new Object[] {15.0, 12.0, 180.0, 5.0, 19.0, 19.0, 5.0, 19.0, 5.0};
+ answers = new Object[] {15.0, 12.0, 180.0, 5.0, 19.0, 19.0, 5.0, 19.0, 5.0, 19.0};
checkAggregates(answers, aggregateResults);
} finally {
dataGroupMember.closeLogManager();
diff --git a/docs/UserGuide/Appendix/SQL-Reference.md b/docs/UserGuide/Appendix/SQL-Reference.md
index 6bf4e6d..5f7c1e8 100644
--- a/docs/UserGuide/Appendix/SQL-Reference.md
+++ b/docs/UserGuide/Appendix/SQL-Reference.md
@@ -953,6 +953,17 @@ Eg. SELECT MAX_VALUE(status), MAX_VALUE(temperature) FROM root.ln.wf01.wt01 WHER
Note: the statement needs to satisfy this constraint: <PrefixPath> + <Path> = <Timeseries>
```
+* EXTREME
+
+The EXTREME function returns the extreme value(lexicographically ordered) of the choosen timeseries (one or more).
+extreme value: The value that has the maximum absolute value.
+If the maximum absolute value of a positive value and a negative value is equal, return the positive value.
+```
+SELECT EXTREME (Path) (COMMA EXTREME (Path))* FROM <FromClause> [WHERE <WhereClause>]?
+Eg. SELECT EXTREME(status), EXTREME(temperature) FROM root.ln.wf01.wt01 WHERE root.ln.wf01.wt01.temperature < 24
+Note: the statement needs to satisfy this constraint: <PrefixPath> + <Path> = <Timeseries>
+```
+
* AVG(Rename from `MEAN` at `V0.9.0`)
The AVG function returns the arithmetic mean value of the choosen timeseries over a specified period of time. The timeseries must be int32, int64, float, double type, and the other types are not to be calculated. The result is a double type number.
diff --git a/docs/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md b/docs/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
index c39909f..87df2cd 100644
--- a/docs/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
+++ b/docs/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
@@ -456,8 +456,8 @@ Total line number = 1
It costs 0.013s
```
-All supported aggregation functions are: count, sum, avg, last_value, first_value, min_time, max_time, min_value, max_value.
-When using four aggregations: sum, avg, min_value and max_value, please make sure all the aggregated series have exactly the same data type.
+All supported aggregation functions are: count, sum, avg, last_value, first_value, min_time, max_time, min_value, max_value, extreme.
+When using four aggregations: sum, avg, min_value, max_value and extreme, please make sure all the aggregated series have exactly the same data type.
Otherwise, it will generate a syntax error.
### Down-Frequency Aggregate Query
diff --git a/docs/zh/UserGuide/Appendix/SQL-Reference.md b/docs/zh/UserGuide/Appendix/SQL-Reference.md
index 7cfffb4..368aa46 100644
--- a/docs/zh/UserGuide/Appendix/SQL-Reference.md
+++ b/docs/zh/UserGuide/Appendix/SQL-Reference.md
@@ -933,6 +933,14 @@ Eg. SELECT MAX_VALUE(status), MAX_VALUE(temperature) FROM root.ln.wf01.wt01 WHER
Note: the statement needs to satisfy this constraint: <PrefixPath> + <Path> = <Timeseries>
```
+* EXTREME
+ 极值:具有最大绝对值的值(正值优先)
+```
+SELECT EXTREME (Path) (COMMA EXTREME (Path))* FROM <FromClause> [WHERE <WhereClause>]?
+Eg. SELECT EXTREME(status), EXTREME(temperature) FROM root.ln.wf01.wt01 WHERE root.ln.wf01.wt01.temperature < 24
+Note: the statement needs to satisfy this constraint: <PrefixPath> + <Path> = <Timeseries>
+```
+
* AVG
原有的 `MEAN` 方法在 `v0.9.0` 版本更名为 `AVG`。
diff --git a/docs/zh/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md b/docs/zh/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
index 0a6eba2..7738ded 100644
--- a/docs/zh/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
+++ b/docs/zh/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
@@ -593,9 +593,9 @@ Total line number = 1
It costs 0.013s
```
-分层聚合查询也可被用于其他聚合函数,当前所支持的聚合函数为:count, sum, avg, last_value, first_value, min_time, max_time, min_value, max_value
+分层聚合查询也可被用于其他聚合函数,当前所支持的聚合函数为:count, sum, avg, last_value, first_value, min_time, max_time, min_value, max_value, extreme
-对于sum, avg, min_value, max_value四种聚合函数,需保证所有聚合的时间序列数据类型相同。其他聚合函数没有此限制。
+对于sum, avg, min_value, max_value, extreme五种聚合函数,需保证所有聚合的时间序列数据类型相同。其他聚合函数没有此限制。
#### 时间区间和路径层级分组聚合查询
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
index 4b6e79c7..4d84a92 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
@@ -61,6 +61,8 @@ public class SQLConstant {
public static final String MAX_VALUE = "max_value";
public static final String MIN_VALUE = "min_value";
+ public static final String EXTREME = "extreme";
+
public static final String FIRST_VALUE = "first_value";
public static final String LAST_VALUE = "last_value";
@@ -75,7 +77,16 @@ public class SQLConstant {
private static final Set<String> NATIVE_FUNCTION_NAMES =
new HashSet<>(
Arrays.asList(
- MIN_TIME, MAX_TIME, MIN_VALUE, MAX_VALUE, FIRST_VALUE, LAST_VALUE, COUNT, SUM, AVG));
+ MIN_TIME,
+ MAX_TIME,
+ MIN_VALUE,
+ MAX_VALUE,
+ EXTREME,
+ FIRST_VALUE,
+ LAST_VALUE,
+ COUNT,
+ SUM,
+ AVG));
public static final int KW_AND = 1;
public static final int KW_OR = 2;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java
index 31b9834..a3c6518 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java
@@ -34,7 +34,8 @@ public enum AggregationType {
MAX_TIME,
MIN_TIME,
MAX_VALUE,
- MIN_VALUE;
+ MIN_VALUE,
+ EXTREME;
/**
* give an integer to return a data type.
@@ -62,6 +63,8 @@ public enum AggregationType {
return MAX_VALUE;
case 8:
return MIN_VALUE;
+ case 9:
+ return EXTREME;
default:
throw new IllegalArgumentException("Invalid Aggregation Type: " + i);
}
@@ -97,6 +100,9 @@ public enum AggregationType {
case MIN_VALUE:
i = 8;
break;
+ case EXTREME:
+ i = 9;
+ break;
default:
throw new IllegalArgumentException("Invalid Aggregation Type: " + this.name());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
new file mode 100644
index 0000000..785986a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.aggregation.impl;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class ExtremeAggrResult extends AggregateResult {
+
+ // timestamp of current value
+ protected long timestamp = Long.MIN_VALUE;
+
+ public ExtremeAggrResult(TSDataType dataType) {
+ super(dataType, AggregationType.EXTREME);
+ reset();
+ }
+
+ public Object getAbsValue(Object v) {
+ switch (resultDataType) {
+ case DOUBLE:
+ return Math.abs((Double) v);
+ case FLOAT:
+ return Math.abs((Float) v);
+ case INT32:
+ return Math.abs((Integer) v);
+ case INT64:
+ return Math.abs((Long) v);
+ default:
+ throw new UnSupportedDataTypeException(String.valueOf(resultDataType));
+ }
+ }
+
+ public Comparable<Object> getExtremeValue(
+ Comparable<Object> extVal, Comparable<Object> currentValue) {
+ if (currentValue != null) {
+ Comparable<Object> absCurrentValue = (Comparable<Object>) getAbsValue(currentValue);
+ if (extVal == null) {
+ extVal = currentValue;
+ } else {
+ Comparable<Object> absExtVal = (Comparable<Object>) getAbsValue(extVal);
+ if (absExtVal.compareTo(absCurrentValue) < 0
+ || (absExtVal.compareTo(absCurrentValue) == 0 && extVal.compareTo(currentValue) < 0)) {
+ extVal = currentValue;
+ }
+ }
+ }
+ return extVal;
+ }
+
+ private void updateResult(Comparable<Object> extVal) {
+ if (extVal == null) {
+ return;
+ }
+
+ Comparable<Object> absExtVal = (Comparable<Object>) getAbsValue(extVal);
+ Comparable<Object> candidateResult = (Comparable<Object>) getValue();
+ Comparable<Object> absCandidateResult = (Comparable<Object>) getAbsValue(getValue());
+
+ if (!hasCandidateResult()
+ || (absExtVal.compareTo(absCandidateResult) > 0
+ || (absExtVal.compareTo(absCandidateResult) == 0
+ && extVal.compareTo(candidateResult) > 0))) {
+ setValue(extVal);
+ }
+ }
+
+ @Override
+ public Object getResult() {
+ return hasCandidateResult() ? getValue() : null;
+ }
+
+ @Override
+ public void updateResultFromStatistics(Statistics statistics) throws QueryProcessException {
+ Comparable<Object> maxVal = (Comparable<Object>) statistics.getMaxValue();
+ Comparable<Object> minVal = (Comparable<Object>) statistics.getMinValue();
+
+ Comparable<Object> absMaxVal = (Comparable<Object>) getAbsValue(maxVal);
+ Comparable<Object> absMinVal = (Comparable<Object>) getAbsValue(minVal);
+
+ Comparable<Object> extVal = absMaxVal.compareTo(absMinVal) >= 0 ? maxVal : minVal;
+ updateResult(extVal);
+ }
+
+ @Override
+ public void updateResultFromPageData(BatchData dataInThisPage)
+ throws IOException, QueryProcessException {
+ updateResultFromPageData(dataInThisPage, Long.MIN_VALUE, Long.MAX_VALUE);
+ }
+
+ @Override
+ public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound)
+ throws IOException {
+ Comparable<Object> extVal = null;
+
+ while (dataInThisPage.hasCurrent()
+ && dataInThisPage.currentTime() < maxBound
+ && dataInThisPage.currentTime() >= minBound) {
+
+ extVal = getExtremeValue(extVal, (Comparable<Object>) dataInThisPage.currentValue());
+ dataInThisPage.next();
+ }
+ updateResult(extVal);
+ }
+
+ @Override
+ public void updateResultUsingTimestamps(
+ long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
+ Comparable<Object> extVal = null;
+
+ Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
+ for (int i = 0; i < length; i++) {
+ extVal = getExtremeValue(extVal, (Comparable<Object>) values[i]);
+ }
+ updateResult(extVal);
+ }
+
+ @Override
+ public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+ Comparable<Object> extVal = null;
+ for (int i = 0; i < length; i++) {
+ extVal = getExtremeValue(extVal, (Comparable<Object>) values[i]);
+ }
+ updateResult(extVal);
+ }
+
+ @Override
+ public boolean hasFinalResult() {
+ return false;
+ }
+
+ @Override
+ public void merge(AggregateResult another) {
+ this.updateResult((Comparable<Object>) another.getResult());
+ }
+
+ @Override
+ protected void deserializeSpecificFields(ByteBuffer buffer) {
+ timestamp = buffer.getLong();
+ }
+
+ @Override
+ protected void serializeSpecificFields(OutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(timestamp, outputStream);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java b/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java
index 6b83f48..987cdcd 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java
@@ -22,19 +22,7 @@ package org.apache.iotdb.db.query.factory;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
-import org.apache.iotdb.db.query.aggregation.impl.AvgAggrResult;
-import org.apache.iotdb.db.query.aggregation.impl.CountAggrResult;
-import org.apache.iotdb.db.query.aggregation.impl.FirstValueAggrResult;
-import org.apache.iotdb.db.query.aggregation.impl.FirstValueDescAggrResult;
-import org.apache.iotdb.db.query.aggregation.impl.LastValueAggrResult;
-import org.apache.iotdb.db.query.aggregation.impl.LastValueDescAggrResult;
-import org.apache.iotdb.db.query.aggregation.impl.MaxTimeAggrResult;
-import org.apache.iotdb.db.query.aggregation.impl.MaxTimeDescAggrResult;
-import org.apache.iotdb.db.query.aggregation.impl.MaxValueAggrResult;
-import org.apache.iotdb.db.query.aggregation.impl.MinTimeAggrResult;
-import org.apache.iotdb.db.query.aggregation.impl.MinTimeDescAggrResult;
-import org.apache.iotdb.db.query.aggregation.impl.MinValueAggrResult;
-import org.apache.iotdb.db.query.aggregation.impl.SumAggrResult;
+import org.apache.iotdb.db.query.aggregation.impl.*;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
/** Easy factory pattern to build AggregateFunction. */
@@ -63,6 +51,8 @@ public class AggregateResultFactory {
return new MinValueAggrResult(dataType);
case SQLConstant.MAX_VALUE:
return new MaxValueAggrResult(dataType);
+ case SQLConstant.EXTREME:
+ return new ExtremeAggrResult(dataType);
case SQLConstant.COUNT:
return new CountAggrResult();
case SQLConstant.AVG:
@@ -96,6 +86,8 @@ public class AggregateResultFactory {
return new MinValueAggrResult(dataType);
case SQLConstant.MAX_VALUE:
return new MaxValueAggrResult(dataType);
+ case SQLConstant.EXTREME:
+ return new ExtremeAggrResult(dataType);
case SQLConstant.COUNT:
return new CountAggrResult();
case SQLConstant.AVG:
@@ -136,6 +128,8 @@ public class AggregateResultFactory {
return new MaxValueAggrResult(dataType);
case MIN_VALUE:
return new MinValueAggrResult(dataType);
+ case EXTREME:
+ return new ExtremeAggrResult(dataType);
default:
throw new IllegalArgumentException("Invalid Aggregation Type: " + aggregationType.name());
}
diff --git a/server/src/test/java/org/apache/iotdb/db/constant/TestConstant.java b/server/src/test/java/org/apache/iotdb/db/constant/TestConstant.java
index 61db9f8..7621040 100644
--- a/server/src/test/java/org/apache/iotdb/db/constant/TestConstant.java
+++ b/server/src/test/java/org/apache/iotdb/db/constant/TestConstant.java
@@ -105,6 +105,10 @@ public class TestConstant {
return String.format("min_value(%s)", path);
}
+ public static String extreme(String path) {
+ return String.format("extreme(%s)", path);
+ }
+
public static String recordToInsert(TSRecord record) {
StringBuilder measurements = new StringBuilder();
StringBuilder values = new StringBuilder();
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationIT.java b/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationIT.java
index bfdfbf2..ab56560 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationIT.java
@@ -38,6 +38,7 @@ import java.util.Locale;
import static org.apache.iotdb.db.constant.TestConstant.avg;
import static org.apache.iotdb.db.constant.TestConstant.count;
+import static org.apache.iotdb.db.constant.TestConstant.extreme;
import static org.apache.iotdb.db.constant.TestConstant.first_value;
import static org.apache.iotdb.db.constant.TestConstant.last_value;
import static org.apache.iotdb.db.constant.TestConstant.max_time;
@@ -965,6 +966,66 @@ public class IoTDBAggregationIT {
}
}
+ @Test
+ public void extremeTest() {
+ String[] retArray = new String[] {"0,8499", "0,2499"};
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet =
+ statement.execute(
+ "SELECT extreme(s0) FROM root.vehicle.d0 WHERE time >= 100 AND time < 9000");
+
+ Assert.assertTrue(hasResultSet);
+ int cnt;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(extreme(d0s0));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(1, cnt);
+ }
+
+ hasResultSet = statement.execute("SELECT extreme(s0) FROM root.vehicle.d0 WHERE time < 2500");
+
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(extreme(d0s0));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(2, cnt);
+ }
+
+ // keep the correctness of `order by time desc`
+ hasResultSet =
+ statement.execute(
+ "SELECT extreme(s0) "
+ + "FROM root.vehicle.d0 WHERE time >= 100 AND time < 9000 order by time desc");
+
+ Assert.assertTrue(hasResultSet);
+ cnt = 0;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(extreme(d0s0));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(1, cnt);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
private void prepareData() {
try (Connection connection =
DriverManager.getConnection(
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationLargeDataIT.java b/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationLargeDataIT.java
index 294f02e..409206f 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationLargeDataIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationLargeDataIT.java
@@ -36,6 +36,7 @@ import java.sql.Statement;
import static org.apache.iotdb.db.constant.TestConstant.avg;
import static org.apache.iotdb.db.constant.TestConstant.count;
+import static org.apache.iotdb.db.constant.TestConstant.extreme;
import static org.apache.iotdb.db.constant.TestConstant.first_value;
import static org.apache.iotdb.db.constant.TestConstant.last_value;
import static org.apache.iotdb.db.constant.TestConstant.max_time;
@@ -143,6 +144,7 @@ public class IoTDBAggregationLargeDataIT {
minTimeAggreWithSingleFilterTest();
minValueAggreWithSingleFilterTest();
maxValueAggreWithSingleFilterTest();
+ extremeAggreWithSingleFilterTest();
countAggreWithMultiFilterTest();
maxTimeAggreWithMultiFilterTest();
@@ -988,4 +990,66 @@ public class IoTDBAggregationLargeDataIT {
fail(e.getMessage());
}
}
+
+ private void extremeAggreWithSingleFilterTest() {
+ String[] retArray = new String[] {"0,99,40000,122.0"};
+
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet =
+ statement.execute(
+ "select extreme(s0),extreme(s1),extreme(s2)"
+ + " from root.vehicle.d0 "
+ + "where s1 < 50000 and s1 != 100");
+
+ if (hasResultSet) {
+ int cnt = 0;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(extreme(d0s0))
+ + ","
+ + resultSet.getString(extreme(d0s1))
+ + ","
+ + resultSet.getString(extreme(d0s2));
+ Assert.assertEquals(ans, retArray[cnt]);
+ cnt++;
+ }
+ Assert.assertEquals(1, cnt);
+ }
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select extreme(s0),extreme(s1),extreme(s2)"
+ + " from root.vehicle.d0 "
+ + "where s1 < 50000 and s1 != 100 order by time desc");
+
+ if (hasResultSet) {
+ int cnt = 0;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(extreme(d0s0))
+ + ","
+ + resultSet.getString(extreme(d0s1))
+ + ","
+ + resultSet.getString(extreme(d0s2));
+ Assert.assertEquals(ans, retArray[cnt]);
+ cnt++;
+ }
+ Assert.assertEquals(1, cnt);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationSmallDataIT.java b/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationSmallDataIT.java
index cca305d..cf49672 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationSmallDataIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationSmallDataIT.java
@@ -37,6 +37,7 @@ import java.sql.Statement;
import static org.apache.iotdb.db.constant.TestConstant.avg;
import static org.apache.iotdb.db.constant.TestConstant.count;
+import static org.apache.iotdb.db.constant.TestConstant.extreme;
import static org.apache.iotdb.db.constant.TestConstant.first_value;
import static org.apache.iotdb.db.constant.TestConstant.last_value;
import static org.apache.iotdb.db.constant.TestConstant.max_time;
@@ -755,6 +756,45 @@ public class IoTDBAggregationSmallDataIT {
}
}
+ @Test
+ public void extremeWithoutFilterTest() throws ClassNotFoundException {
+ String[] retArray = new String[] {"0,22222,null"};
+
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ try {
+ statement.execute("SELECT extreme(d0.s0),extreme(d1.s1),extreme(d0.s3) FROM root.vehicle");
+ fail();
+ } catch (IoTDBSQLException e) {
+ Assert.assertTrue(e.toString().contains("Binary statistics does not support: max"));
+ }
+
+ boolean hasResultSet =
+ statement.execute("SELECT extreme(d0.s0),extreme(d1.s1) FROM root.vehicle");
+ Assert.assertTrue(hasResultSet);
+ int cnt = 0;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(extreme(d0s0))
+ + ","
+ + resultSet.getString(extreme(d1s1));
+ Assert.assertEquals(retArray[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(1, cnt);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
public static void insertSQL() throws ClassNotFoundException {
Class.forName(Config.JDBC_DRIVER_NAME);
try (Connection connection =
diff --git a/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java b/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java
index 3a80236..d421aa4 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java
@@ -266,4 +266,29 @@ public class AggregateResultTest {
AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
Assert.assertEquals(2d, (double) result.getResult(), 0.01);
}
+
+ @Test
+ public void ExtremeAggrResultTest() throws QueryProcessException, IOException {
+ AggregateResult extremeAggrResult1 =
+ AggregateResultFactory.getAggrResultByName(SQLConstant.EXTREME, TSDataType.DOUBLE, true);
+ AggregateResult extremeAggrResult2 =
+ AggregateResultFactory.getAggrResultByName(SQLConstant.EXTREME, TSDataType.DOUBLE, true);
+
+ Statistics statistics1 = Statistics.getStatsByType(TSDataType.DOUBLE);
+ Statistics statistics2 = Statistics.getStatsByType(TSDataType.DOUBLE);
+ statistics1.update(1L, 1d);
+ statistics2.update(1L, -2d);
+
+ extremeAggrResult1.updateResultFromStatistics(statistics1);
+ extremeAggrResult2.updateResultFromStatistics(statistics2);
+ extremeAggrResult1.merge(extremeAggrResult2);
+
+ Assert.assertEquals(-2d, (double) extremeAggrResult1.getResult(), 0.01);
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ extremeAggrResult1.serializeTo(outputStream);
+ ByteBuffer byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
+ AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
+ Assert.assertEquals(-2d, (double) result.getResult(), 0.01);
+ }
}