You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/12/23 11:46:15 UTC

[iotdb] branch rel/0.12 updated: [To rel/0.12] [IOTDB-2192] Support extreme function (#4626)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new c5531cb  [To rel/0.12] [IOTDB-2192] Support extreme function (#4626)
c5531cb is described below

commit c5531cb7f8c418ce90994cc21db6036fddac0b31
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Thu Dec 23 19:45:33 2021 +0800

    [To rel/0.12] [IOTDB-2192] Support extreme function (#4626)
---
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4   |   5 +
 .../query/groupby/MergeGroupByExecutorTest.java    |   8 +-
 .../query/groupby/RemoteGroupByExecutorTest.java   |   8 +-
 .../cluster/server/member/DataGroupMemberTest.java |   2 +-
 docs/UserGuide/Appendix/SQL-Reference.md           |  11 ++
 .../DML-Data-Manipulation-Language.md              |   4 +-
 docs/zh/UserGuide/Appendix/SQL-Reference.md        |   8 +
 .../DML-Data-Manipulation-Language.md              |   4 +-
 .../apache/iotdb/db/qp/constant/SQLConstant.java   |  13 +-
 .../db/query/aggregation/AggregationType.java      |   8 +-
 .../query/aggregation/impl/ExtremeAggrResult.java  | 173 +++++++++++++++++++++
 .../db/query/factory/AggregateResultFactory.java   |  20 +--
 .../org/apache/iotdb/db/constant/TestConstant.java |   4 +
 .../aggregation/IoTDBAggregationIT.java            |  61 ++++++++
 .../aggregation/IoTDBAggregationLargeDataIT.java   |  64 ++++++++
 .../aggregation/IoTDBAggregationSmallDataIT.java   |  40 +++++
 .../db/query/aggregation/AggregateResultTest.java  |  25 +++
 17 files changed, 430 insertions(+), 28 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4
index 0062431..3e3c11e 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4
@@ -158,6 +158,7 @@ functionName
     | FIRST_VALUE
     | SUM
     | LAST_VALUE
+    | EXTREME
     ;
 
 functionAsClause
@@ -1143,6 +1144,10 @@ MAX_VALUE
     : M A X UNDERLINE V A L U E
     ;
 
+EXTREME
+    : E X T R E M E
+    ;
+
 AVG
     : A V G
     ;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/MergeGroupByExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/MergeGroupByExecutorTest.java
index aa5bb34..dba649c 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/MergeGroupByExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/MergeGroupByExecutorTest.java
@@ -70,11 +70,11 @@ public class MergeGroupByExecutorTest extends BaseQueryTest {
 
       Object[] answers;
       List<AggregateResult> aggregateResults;
-      answers = new Object[] {5.0, 2.0, 10.0, 0.0, 4.0, 4.0, 0.0, 4.0, 0.0};
+      answers = new Object[] {5.0, 2.0, 10.0, 0.0, 4.0, 4.0, 0.0, 4.0, 0.0, 4.0};
       aggregateResults = groupByExecutor.calcResult(0, 5);
       checkAggregations(aggregateResults, answers);
 
-      answers = new Object[] {5.0, 7.0, 35.0, 5.0, 9.0, 9.0, 5.0, 9.0, 5.0};
+      answers = new Object[] {5.0, 7.0, 35.0, 5.0, 9.0, 9.0, 5.0, 9.0, 5.0, 9.0};
       aggregateResults = groupByExecutor.calcResult(5, 10);
       checkAggregations(aggregateResults, answers);
     } finally {
@@ -109,11 +109,11 @@ public class MergeGroupByExecutorTest extends BaseQueryTest {
 
       Object[] answers;
       List<AggregateResult> aggregateResults;
-      answers = new Object[] {2.0, 3.5, 7.0, 3.0, 4.0, 4.0, 3.0, 4.0, 3.0};
+      answers = new Object[] {2.0, 3.5, 7.0, 3.0, 4.0, 4.0, 3.0, 4.0, 3.0, 4.0};
       aggregateResults = groupByExecutor.calcResult(0, 5);
       checkAggregations(aggregateResults, answers);
 
-      answers = new Object[] {5.0, 7.0, 35.0, 5.0, 9.0, 9.0, 5.0, 9.0, 5.0};
+      answers = new Object[] {5.0, 7.0, 35.0, 5.0, 9.0, 9.0, 5.0, 9.0, 5.0, 9.0};
       aggregateResults = groupByExecutor.calcResult(5, 10);
       checkAggregations(aggregateResults, answers);
     } finally {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java
index a89e811..5994eb8 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java
@@ -79,11 +79,11 @@ public class RemoteGroupByExecutorTest extends BaseQueryTest {
         if (groupByExecutors.size() == 1) {
           // a series is only managed by one group
           List<AggregateResult> aggregateResults;
-          answers = new Object[] {5.0, 2.0, 10.0, 0.0, 4.0, 4.0, 0.0, 4.0, 0.0};
+          answers = new Object[] {5.0, 2.0, 10.0, 0.0, 4.0, 4.0, 0.0, 4.0, 0.0, 4.0};
           aggregateResults = groupByExecutor.calcResult(0, 5);
           checkAggregations(aggregateResults, answers);
 
-          answers = new Object[] {5.0, 7.0, 35.0, 5.0, 9.0, 9.0, 5.0, 9.0, 5.0};
+          answers = new Object[] {5.0, 7.0, 35.0, 5.0, 9.0, 9.0, 5.0, 9.0, 5.0, 9.0};
           aggregateResults = groupByExecutor.calcResult(5, 10);
           checkAggregations(aggregateResults, answers);
         } else {
@@ -141,11 +141,11 @@ public class RemoteGroupByExecutorTest extends BaseQueryTest {
         if (groupByExecutors.size() == 1) {
           // a series is only managed by one group
           List<AggregateResult> aggregateResults;
-          answers = new Object[] {2.0, 3.5, 7.0, 3.0, 4.0, 4.0, 3.0, 4.0, 3.0};
+          answers = new Object[] {2.0, 3.5, 7.0, 3.0, 4.0, 4.0, 3.0, 4.0, 3.0, 4.0};
           aggregateResults = groupByExecutor.calcResult(0, 5);
           checkAggregations(aggregateResults, answers);
 
-          answers = new Object[] {5.0, 7.0, 35.0, 5.0, 9.0, 9.0, 5.0, 9.0, 5.0};
+          answers = new Object[] {5.0, 7.0, 35.0, 5.0, 9.0, 9.0, 5.0, 9.0, 5.0, 9.0};
           aggregateResults = groupByExecutor.calcResult(5, 10);
           checkAggregations(aggregateResults, answers);
         } else {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index d2a66ad..ceff919 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -1149,7 +1149,7 @@ public class DataGroupMemberTest extends BaseMember {
         for (ByteBuffer byteBuffer : byteBuffers) {
           aggregateResults.add(AggregateResult.deserializeFrom(byteBuffer));
         }
-        answers = new Object[] {15.0, 12.0, 180.0, 5.0, 19.0, 19.0, 5.0, 19.0, 5.0};
+        answers = new Object[] {15.0, 12.0, 180.0, 5.0, 19.0, 19.0, 5.0, 19.0, 5.0, 19.0};
         checkAggregates(answers, aggregateResults);
       } finally {
         dataGroupMember.closeLogManager();
diff --git a/docs/UserGuide/Appendix/SQL-Reference.md b/docs/UserGuide/Appendix/SQL-Reference.md
index 6bf4e6d..5f7c1e8 100644
--- a/docs/UserGuide/Appendix/SQL-Reference.md
+++ b/docs/UserGuide/Appendix/SQL-Reference.md
@@ -953,6 +953,17 @@ Eg. SELECT MAX_VALUE(status), MAX_VALUE(temperature) FROM root.ln.wf01.wt01 WHER
 Note: the statement needs to satisfy this constraint: <PrefixPath> + <Path> = <Timeseries>
 ```
 
+* EXTREME
+
+The EXTREME function returns the extreme value(lexicographically ordered) of the choosen timeseries (one or more).
+extreme value: The value that has the maximum absolute value.
+If the maximum absolute value of a positive value and a negative value is equal, return the positive value.
+```
+SELECT EXTREME (Path) (COMMA EXTREME (Path))* FROM <FromClause> [WHERE <WhereClause>]?
+Eg. SELECT EXTREME(status), EXTREME(temperature) FROM root.ln.wf01.wt01 WHERE root.ln.wf01.wt01.temperature < 24
+Note: the statement needs to satisfy this constraint: <PrefixPath> + <Path> = <Timeseries>
+```
+
 * AVG(Rename from `MEAN` at `V0.9.0`)
 
 The AVG function returns the arithmetic mean value of the choosen timeseries over a specified period of time. The timeseries must be int32, int64, float, double type, and the other types are not to be calculated. The result is a double type number.
diff --git a/docs/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md b/docs/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
index c39909f..87df2cd 100644
--- a/docs/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
+++ b/docs/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
@@ -456,8 +456,8 @@ Total line number = 1
 It costs 0.013s
 ```
 
-All supported aggregation functions are: count, sum, avg, last_value, first_value, min_time, max_time, min_value, max_value.
-When using four aggregations: sum, avg, min_value and max_value, please make sure all the aggregated series have exactly the same data type.
+All supported aggregation functions are: count, sum, avg, last_value, first_value, min_time, max_time, min_value, max_value, extreme.
+When using four aggregations: sum, avg, min_value, max_value and extreme, please make sure all the aggregated series have exactly the same data type.
 Otherwise, it will generate a syntax error.
 
 ### Down-Frequency Aggregate Query
diff --git a/docs/zh/UserGuide/Appendix/SQL-Reference.md b/docs/zh/UserGuide/Appendix/SQL-Reference.md
index 7cfffb4..368aa46 100644
--- a/docs/zh/UserGuide/Appendix/SQL-Reference.md
+++ b/docs/zh/UserGuide/Appendix/SQL-Reference.md
@@ -933,6 +933,14 @@ Eg. SELECT MAX_VALUE(status), MAX_VALUE(temperature) FROM root.ln.wf01.wt01 WHER
 Note: the statement needs to satisfy this constraint: <PrefixPath> + <Path> = <Timeseries>
 ```
 
+* EXTREME
+  极值:具有最大绝对值的值(正值优先)
+```
+SELECT EXTREME (Path) (COMMA EXTREME (Path))* FROM <FromClause> [WHERE <WhereClause>]?
+Eg. SELECT EXTREME(status), EXTREME(temperature) FROM root.ln.wf01.wt01 WHERE root.ln.wf01.wt01.temperature < 24
+Note: the statement needs to satisfy this constraint: <PrefixPath> + <Path> = <Timeseries>
+```
+
 * AVG
 原有的 `MEAN` 方法在 `v0.9.0` 版本更名为 `AVG`。
 
diff --git a/docs/zh/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md b/docs/zh/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
index 0a6eba2..7738ded 100644
--- a/docs/zh/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
+++ b/docs/zh/UserGuide/IoTDB-SQL-Language/DML-Data-Manipulation-Language.md
@@ -593,9 +593,9 @@ Total line number = 1
 It costs 0.013s
 ```
 
-分层聚合查询也可被用于其他聚合函数,当前所支持的聚合函数为:count, sum, avg, last_value, first_value, min_time, max_time, min_value, max_value
+分层聚合查询也可被用于其他聚合函数,当前所支持的聚合函数为:count, sum, avg, last_value, first_value, min_time, max_time, min_value, max_value, extreme
 
-对于sum, avg, min_value, max_value四种聚合函数,需保证所有聚合的时间序列数据类型相同。其他聚合函数没有此限制。
+对于sum, avg, min_value, max_value, extreme五种聚合函数,需保证所有聚合的时间序列数据类型相同。其他聚合函数没有此限制。
 
 #### 时间区间和路径层级分组聚合查询
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
index 4b6e79c7..4d84a92 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
@@ -61,6 +61,8 @@ public class SQLConstant {
   public static final String MAX_VALUE = "max_value";
   public static final String MIN_VALUE = "min_value";
 
+  public static final String EXTREME = "extreme";
+
   public static final String FIRST_VALUE = "first_value";
   public static final String LAST_VALUE = "last_value";
 
@@ -75,7 +77,16 @@ public class SQLConstant {
   private static final Set<String> NATIVE_FUNCTION_NAMES =
       new HashSet<>(
           Arrays.asList(
-              MIN_TIME, MAX_TIME, MIN_VALUE, MAX_VALUE, FIRST_VALUE, LAST_VALUE, COUNT, SUM, AVG));
+              MIN_TIME,
+              MAX_TIME,
+              MIN_VALUE,
+              MAX_VALUE,
+              EXTREME,
+              FIRST_VALUE,
+              LAST_VALUE,
+              COUNT,
+              SUM,
+              AVG));
 
   public static final int KW_AND = 1;
   public static final int KW_OR = 2;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java
index 31b9834..a3c6518 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregationType.java
@@ -34,7 +34,8 @@ public enum AggregationType {
   MAX_TIME,
   MIN_TIME,
   MAX_VALUE,
-  MIN_VALUE;
+  MIN_VALUE,
+  EXTREME;
 
   /**
    * give an integer to return a data type.
@@ -62,6 +63,8 @@ public enum AggregationType {
         return MAX_VALUE;
       case 8:
         return MIN_VALUE;
+      case 9:
+        return EXTREME;
       default:
         throw new IllegalArgumentException("Invalid Aggregation Type: " + i);
     }
@@ -97,6 +100,9 @@ public enum AggregationType {
       case MIN_VALUE:
         i = 8;
         break;
+      case EXTREME:
+        i = 9;
+        break;
       default:
         throw new IllegalArgumentException("Invalid Aggregation Type: " + this.name());
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
new file mode 100644
index 0000000..785986a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.aggregation.impl;
+
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class ExtremeAggrResult extends AggregateResult {
+
+  // timestamp of current value
+  protected long timestamp = Long.MIN_VALUE;
+
+  public ExtremeAggrResult(TSDataType dataType) {
+    super(dataType, AggregationType.EXTREME);
+    reset();
+  }
+
+  public Object getAbsValue(Object v) {
+    switch (resultDataType) {
+      case DOUBLE:
+        return Math.abs((Double) v);
+      case FLOAT:
+        return Math.abs((Float) v);
+      case INT32:
+        return Math.abs((Integer) v);
+      case INT64:
+        return Math.abs((Long) v);
+      default:
+        throw new UnSupportedDataTypeException(String.valueOf(resultDataType));
+    }
+  }
+
+  public Comparable<Object> getExtremeValue(
+      Comparable<Object> extVal, Comparable<Object> currentValue) {
+    if (currentValue != null) {
+      Comparable<Object> absCurrentValue = (Comparable<Object>) getAbsValue(currentValue);
+      if (extVal == null) {
+        extVal = currentValue;
+      } else {
+        Comparable<Object> absExtVal = (Comparable<Object>) getAbsValue(extVal);
+        if (absExtVal.compareTo(absCurrentValue) < 0
+            || (absExtVal.compareTo(absCurrentValue) == 0 && extVal.compareTo(currentValue) < 0)) {
+          extVal = currentValue;
+        }
+      }
+    }
+    return extVal;
+  }
+
+  private void updateResult(Comparable<Object> extVal) {
+    if (extVal == null) {
+      return;
+    }
+
+    Comparable<Object> absExtVal = (Comparable<Object>) getAbsValue(extVal);
+    Comparable<Object> candidateResult = (Comparable<Object>) getValue();
+    Comparable<Object> absCandidateResult = (Comparable<Object>) getAbsValue(getValue());
+
+    if (!hasCandidateResult()
+        || (absExtVal.compareTo(absCandidateResult) > 0
+            || (absExtVal.compareTo(absCandidateResult) == 0
+                && extVal.compareTo(candidateResult) > 0))) {
+      setValue(extVal);
+    }
+  }
+
+  @Override
+  public Object getResult() {
+    return hasCandidateResult() ? getValue() : null;
+  }
+
+  @Override
+  public void updateResultFromStatistics(Statistics statistics) throws QueryProcessException {
+    Comparable<Object> maxVal = (Comparable<Object>) statistics.getMaxValue();
+    Comparable<Object> minVal = (Comparable<Object>) statistics.getMinValue();
+
+    Comparable<Object> absMaxVal = (Comparable<Object>) getAbsValue(maxVal);
+    Comparable<Object> absMinVal = (Comparable<Object>) getAbsValue(minVal);
+
+    Comparable<Object> extVal = absMaxVal.compareTo(absMinVal) >= 0 ? maxVal : minVal;
+    updateResult(extVal);
+  }
+
+  @Override
+  public void updateResultFromPageData(BatchData dataInThisPage)
+      throws IOException, QueryProcessException {
+    updateResultFromPageData(dataInThisPage, Long.MIN_VALUE, Long.MAX_VALUE);
+  }
+
+  @Override
+  public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound)
+      throws IOException {
+    Comparable<Object> extVal = null;
+
+    while (dataInThisPage.hasCurrent()
+        && dataInThisPage.currentTime() < maxBound
+        && dataInThisPage.currentTime() >= minBound) {
+
+      extVal = getExtremeValue(extVal, (Comparable<Object>) dataInThisPage.currentValue());
+      dataInThisPage.next();
+    }
+    updateResult(extVal);
+  }
+
+  @Override
+  public void updateResultUsingTimestamps(
+      long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException {
+    Comparable<Object> extVal = null;
+
+    Object[] values = dataReader.getValuesInTimestamps(timestamps, length);
+    for (int i = 0; i < length; i++) {
+      extVal = getExtremeValue(extVal, (Comparable<Object>) values[i]);
+    }
+    updateResult(extVal);
+  }
+
+  @Override
+  public void updateResultUsingValues(long[] timestamps, int length, Object[] values) {
+    Comparable<Object> extVal = null;
+    for (int i = 0; i < length; i++) {
+      extVal = getExtremeValue(extVal, (Comparable<Object>) values[i]);
+    }
+    updateResult(extVal);
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+
+  @Override
+  public void merge(AggregateResult another) {
+    this.updateResult((Comparable<Object>) another.getResult());
+  }
+
+  @Override
+  protected void deserializeSpecificFields(ByteBuffer buffer) {
+    timestamp = buffer.getLong();
+  }
+
+  @Override
+  protected void serializeSpecificFields(OutputStream outputStream) throws IOException {
+    ReadWriteIOUtils.write(timestamp, outputStream);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java b/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java
index 6b83f48..987cdcd 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/factory/AggregateResultFactory.java
@@ -22,19 +22,7 @@ package org.apache.iotdb.db.query.factory;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
-import org.apache.iotdb.db.query.aggregation.impl.AvgAggrResult;
-import org.apache.iotdb.db.query.aggregation.impl.CountAggrResult;
-import org.apache.iotdb.db.query.aggregation.impl.FirstValueAggrResult;
-import org.apache.iotdb.db.query.aggregation.impl.FirstValueDescAggrResult;
-import org.apache.iotdb.db.query.aggregation.impl.LastValueAggrResult;
-import org.apache.iotdb.db.query.aggregation.impl.LastValueDescAggrResult;
-import org.apache.iotdb.db.query.aggregation.impl.MaxTimeAggrResult;
-import org.apache.iotdb.db.query.aggregation.impl.MaxTimeDescAggrResult;
-import org.apache.iotdb.db.query.aggregation.impl.MaxValueAggrResult;
-import org.apache.iotdb.db.query.aggregation.impl.MinTimeAggrResult;
-import org.apache.iotdb.db.query.aggregation.impl.MinTimeDescAggrResult;
-import org.apache.iotdb.db.query.aggregation.impl.MinValueAggrResult;
-import org.apache.iotdb.db.query.aggregation.impl.SumAggrResult;
+import org.apache.iotdb.db.query.aggregation.impl.*;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 /** Easy factory pattern to build AggregateFunction. */
@@ -63,6 +51,8 @@ public class AggregateResultFactory {
         return new MinValueAggrResult(dataType);
       case SQLConstant.MAX_VALUE:
         return new MaxValueAggrResult(dataType);
+      case SQLConstant.EXTREME:
+        return new ExtremeAggrResult(dataType);
       case SQLConstant.COUNT:
         return new CountAggrResult();
       case SQLConstant.AVG:
@@ -96,6 +86,8 @@ public class AggregateResultFactory {
         return new MinValueAggrResult(dataType);
       case SQLConstant.MAX_VALUE:
         return new MaxValueAggrResult(dataType);
+      case SQLConstant.EXTREME:
+        return new ExtremeAggrResult(dataType);
       case SQLConstant.COUNT:
         return new CountAggrResult();
       case SQLConstant.AVG:
@@ -136,6 +128,8 @@ public class AggregateResultFactory {
         return new MaxValueAggrResult(dataType);
       case MIN_VALUE:
         return new MinValueAggrResult(dataType);
+      case EXTREME:
+        return new ExtremeAggrResult(dataType);
       default:
         throw new IllegalArgumentException("Invalid Aggregation Type: " + aggregationType.name());
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/constant/TestConstant.java b/server/src/test/java/org/apache/iotdb/db/constant/TestConstant.java
index 61db9f8..7621040 100644
--- a/server/src/test/java/org/apache/iotdb/db/constant/TestConstant.java
+++ b/server/src/test/java/org/apache/iotdb/db/constant/TestConstant.java
@@ -105,6 +105,10 @@ public class TestConstant {
     return String.format("min_value(%s)", path);
   }
 
+  public static String extreme(String path) {
+    return String.format("extreme(%s)", path);
+  }
+
   public static String recordToInsert(TSRecord record) {
     StringBuilder measurements = new StringBuilder();
     StringBuilder values = new StringBuilder();
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationIT.java b/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationIT.java
index bfdfbf2..ab56560 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationIT.java
@@ -38,6 +38,7 @@ import java.util.Locale;
 
 import static org.apache.iotdb.db.constant.TestConstant.avg;
 import static org.apache.iotdb.db.constant.TestConstant.count;
+import static org.apache.iotdb.db.constant.TestConstant.extreme;
 import static org.apache.iotdb.db.constant.TestConstant.first_value;
 import static org.apache.iotdb.db.constant.TestConstant.last_value;
 import static org.apache.iotdb.db.constant.TestConstant.max_time;
@@ -965,6 +966,66 @@ public class IoTDBAggregationIT {
     }
   }
 
+  @Test
+  public void extremeTest() {
+    String[] retArray = new String[] {"0,8499", "0,2499"};
+    try (Connection connection =
+            DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet =
+          statement.execute(
+              "SELECT extreme(s0) FROM root.vehicle.d0 WHERE time >= 100 AND time < 9000");
+
+      Assert.assertTrue(hasResultSet);
+      int cnt;
+      try (ResultSet resultSet = statement.getResultSet()) {
+        cnt = 0;
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(extreme(d0s0));
+          Assert.assertEquals(retArray[cnt], ans);
+          cnt++;
+        }
+        Assert.assertEquals(1, cnt);
+      }
+
+      hasResultSet = statement.execute("SELECT extreme(s0) FROM root.vehicle.d0 WHERE time < 2500");
+
+      Assert.assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(extreme(d0s0));
+          Assert.assertEquals(retArray[cnt], ans);
+          cnt++;
+        }
+        Assert.assertEquals(2, cnt);
+      }
+
+      // keep the correctness of `order by time desc`
+      hasResultSet =
+          statement.execute(
+              "SELECT extreme(s0) "
+                  + "FROM root.vehicle.d0 WHERE time >= 100 AND time < 9000 order by time desc");
+
+      Assert.assertTrue(hasResultSet);
+      cnt = 0;
+      try (ResultSet resultSet = statement.getResultSet()) {
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(extreme(d0s0));
+          Assert.assertEquals(retArray[cnt], ans);
+          cnt++;
+        }
+        Assert.assertEquals(1, cnt);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
   private void prepareData() {
     try (Connection connection =
             DriverManager.getConnection(
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationLargeDataIT.java b/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationLargeDataIT.java
index 294f02e..409206f 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationLargeDataIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationLargeDataIT.java
@@ -36,6 +36,7 @@ import java.sql.Statement;
 
 import static org.apache.iotdb.db.constant.TestConstant.avg;
 import static org.apache.iotdb.db.constant.TestConstant.count;
+import static org.apache.iotdb.db.constant.TestConstant.extreme;
 import static org.apache.iotdb.db.constant.TestConstant.first_value;
 import static org.apache.iotdb.db.constant.TestConstant.last_value;
 import static org.apache.iotdb.db.constant.TestConstant.max_time;
@@ -143,6 +144,7 @@ public class IoTDBAggregationLargeDataIT {
     minTimeAggreWithSingleFilterTest();
     minValueAggreWithSingleFilterTest();
     maxValueAggreWithSingleFilterTest();
+    extremeAggreWithSingleFilterTest();
 
     countAggreWithMultiFilterTest();
     maxTimeAggreWithMultiFilterTest();
@@ -988,4 +990,66 @@ public class IoTDBAggregationLargeDataIT {
       fail(e.getMessage());
     }
   }
+
+  private void extremeAggreWithSingleFilterTest() {
+    String[] retArray = new String[] {"0,99,40000,122.0"};
+
+    try (Connection connection =
+            DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet =
+          statement.execute(
+              "select extreme(s0),extreme(s1),extreme(s2)"
+                  + " from root.vehicle.d0 "
+                  + "where s1 < 50000 and s1 != 100");
+
+      if (hasResultSet) {
+        int cnt = 0;
+        try (ResultSet resultSet = statement.getResultSet()) {
+          while (resultSet.next()) {
+            String ans =
+                resultSet.getString(TIMESTAMP_STR)
+                    + ","
+                    + resultSet.getString(extreme(d0s0))
+                    + ","
+                    + resultSet.getString(extreme(d0s1))
+                    + ","
+                    + resultSet.getString(extreme(d0s2));
+            Assert.assertEquals(ans, retArray[cnt]);
+            cnt++;
+          }
+          Assert.assertEquals(1, cnt);
+        }
+      }
+
+      hasResultSet =
+          statement.execute(
+              "select extreme(s0),extreme(s1),extreme(s2)"
+                  + " from root.vehicle.d0 "
+                  + "where s1 < 50000 and s1 != 100 order by time desc");
+
+      if (hasResultSet) {
+        int cnt = 0;
+        try (ResultSet resultSet = statement.getResultSet()) {
+          while (resultSet.next()) {
+            String ans =
+                resultSet.getString(TIMESTAMP_STR)
+                    + ","
+                    + resultSet.getString(extreme(d0s0))
+                    + ","
+                    + resultSet.getString(extreme(d0s1))
+                    + ","
+                    + resultSet.getString(extreme(d0s2));
+            Assert.assertEquals(ans, retArray[cnt]);
+            cnt++;
+          }
+          Assert.assertEquals(1, cnt);
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationSmallDataIT.java b/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationSmallDataIT.java
index cca305d..cf49672 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationSmallDataIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationSmallDataIT.java
@@ -37,6 +37,7 @@ import java.sql.Statement;
 
 import static org.apache.iotdb.db.constant.TestConstant.avg;
 import static org.apache.iotdb.db.constant.TestConstant.count;
+import static org.apache.iotdb.db.constant.TestConstant.extreme;
 import static org.apache.iotdb.db.constant.TestConstant.first_value;
 import static org.apache.iotdb.db.constant.TestConstant.last_value;
 import static org.apache.iotdb.db.constant.TestConstant.max_time;
@@ -755,6 +756,45 @@ public class IoTDBAggregationSmallDataIT {
     }
   }
 
+  @Test
+  public void extremeWithoutFilterTest() throws ClassNotFoundException {
+    String[] retArray = new String[] {"0,22222,null"};
+
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      try {
+        statement.execute("SELECT extreme(d0.s0),extreme(d1.s1),extreme(d0.s3) FROM root.vehicle");
+        fail();
+      } catch (IoTDBSQLException e) {
+        Assert.assertTrue(e.toString().contains("Binary statistics does not support: max"));
+      }
+
+      boolean hasResultSet =
+          statement.execute("SELECT extreme(d0.s0),extreme(d1.s1) FROM root.vehicle");
+      Assert.assertTrue(hasResultSet);
+      int cnt = 0;
+      try (ResultSet resultSet = statement.getResultSet()) {
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR)
+                  + ","
+                  + resultSet.getString(extreme(d0s0))
+                  + ","
+                  + resultSet.getString(extreme(d1s1));
+          Assert.assertEquals(retArray[cnt], ans);
+          cnt++;
+        }
+        Assert.assertEquals(1, cnt);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
   public static void insertSQL() throws ClassNotFoundException {
     Class.forName(Config.JDBC_DRIVER_NAME);
     try (Connection connection =
diff --git a/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java b/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java
index 3a80236..d421aa4 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/aggregation/AggregateResultTest.java
@@ -266,4 +266,29 @@ public class AggregateResultTest {
     AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
     Assert.assertEquals(2d, (double) result.getResult(), 0.01);
   }
+
+  @Test
+  public void ExtremeAggrResultTest() throws QueryProcessException, IOException {
+    AggregateResult extremeAggrResult1 =
+        AggregateResultFactory.getAggrResultByName(SQLConstant.EXTREME, TSDataType.DOUBLE, true);
+    AggregateResult extremeAggrResult2 =
+        AggregateResultFactory.getAggrResultByName(SQLConstant.EXTREME, TSDataType.DOUBLE, true);
+
+    Statistics statistics1 = Statistics.getStatsByType(TSDataType.DOUBLE);
+    Statistics statistics2 = Statistics.getStatsByType(TSDataType.DOUBLE);
+    statistics1.update(1L, 1d);
+    statistics2.update(1L, -2d);
+
+    extremeAggrResult1.updateResultFromStatistics(statistics1);
+    extremeAggrResult2.updateResultFromStatistics(statistics2);
+    extremeAggrResult1.merge(extremeAggrResult2);
+
+    Assert.assertEquals(-2d, (double) extremeAggrResult1.getResult(), 0.01);
+
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    extremeAggrResult1.serializeTo(outputStream);
+    ByteBuffer byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
+    AggregateResult result = AggregateResult.deserializeFrom(byteBuffer);
+    Assert.assertEquals(-2d, (double) result.getResult(), 0.01);
+  }
 }