You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/26 21:23:19 UTC
[arrow-datafusion] branch main updated: Add support for FIRST_VALUE, LAST_VALUE Aggregate Functions (#6445)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new f54f514a49 Add support for FIRST_VALUE, LAST_VALUE Aggregate Functions (#6445)
f54f514a49 is described below
commit f54f514a4923bc49e0e1c57a4cf570aebd1b6137
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Sat May 27 00:23:13 2023 +0300
Add support for FIRST_VALUE, LAST_VALUE Aggregate Functions (#6445)
* Naive test pass
i
* Add new tests and simplifications
* move tests to the .slt file
* update requirement
* update tests
* Add support for partiallyOrdered aggregation sensitive.
* Resolve linter errors
* update comments
* minor changes
* retract changes in generated
* update proto files
* Simplifications
* Make types consistent in schema, and data
* Update todos
* Convert API to vector
* Convert get_finest to handle Vector inputs
* simplifications, update comment
* initial commit, add test
* Add support for FIRST Aggregate function.
* Add support for last aggregate
* Update cargo.lock
* Remove distinct, and limit from First and last aggregate.
* Add reverse for First and Last Aggregator
* Update cargo lock
* Minor code simplifications
* Update comment
* Update documents
* Fix projection pushdown bug
* fix projection push down failure bug
* combine first_agg and last_agg parsers
* Update documentation
* Update subproject
* initial commit
* Add test code
* initial version
* simplify prints
* minor changes
* sqllogictests pass
* All tests pass
* update proto function names
* Minor changes
* do not consider ordering requirement in ordering insensitive aggregators
* Reject aggregate order by for window functions.
* simplifications
* Fix cargo lock file
* Update comment
* Rename aggregator first and last
* minor change
* Comment improvements
* Remove count from First,Last accumulators
* Address reviews
* Remove camel to upper snake util, make aggregate function names explicit
* update the test
---------
Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
Co-authored-by: berkaysynnada <be...@synnada.ai>
---
.../core/src/physical_plan/aggregates/mod.rs | 25 +-
datafusion/core/tests/dataframe_functions.rs | 20 +-
datafusion/core/tests/sql/aggregates.rs | 4 +-
.../tests/sqllogictests/test_files/aggregate.slt | 2 +-
.../tests/sqllogictests/test_files/groupby.slt | 140 ++++++++---
.../core/tests/sqllogictests/test_files/window.slt | 25 +-
datafusion/expr/src/aggregate_function.rs | 50 +++-
datafusion/expr/src/type_coercion/aggregates.rs | 4 +-
datafusion/expr/src/window_function.rs | 44 ++--
datafusion/physical-expr/src/aggregate/build_in.rs | 10 +
.../physical-expr/src/aggregate/first_last.rs | 272 +++++++++++++++++++++
datafusion/physical-expr/src/aggregate/mod.rs | 1 +
datafusion/physical-expr/src/expressions/mod.rs | 1 +
datafusion/proto/proto/datafusion.proto | 4 +
datafusion/proto/src/generated/pbjson.rs | 6 +
datafusion/proto/src/generated/prost.rs | 8 +
datafusion/proto/src/logical_plan/from_proto.rs | 2 +
datafusion/proto/src/logical_plan/to_proto.rs | 8 +
datafusion/sql/src/expr/function.rs | 14 +-
datafusion/sql/tests/integration_test.rs | 8 +-
docs/source/user-guide/sql/aggregate_functions.md | 28 +++
21 files changed, 601 insertions(+), 75 deletions(-)
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs
index fc722e9ac3..39e42bbd60 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -55,6 +55,7 @@ mod utils;
pub use datafusion_expr::AggregateFunction;
pub use datafusion_physical_expr::expressions::create_aggregate_expr;
+use datafusion_physical_expr::expressions::{ArrayAgg, FirstValue, LastValue};
/// Hash aggregate modes
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
@@ -388,6 +389,16 @@ fn get_finest_requirement<
Ok(result)
}
+/// Checks whether the given aggregate expression is order-sensitive.
+/// For instance, a `SUM` aggregation doesn't depend on the order of its inputs.
+/// However, a `FirstAgg` depends on the input ordering (if the order changes,
+/// the first value in the list would change).
+fn is_order_sensitive(aggr_expr: &Arc<dyn AggregateExpr>) -> bool {
+ aggr_expr.as_any().is::<FirstValue>()
+ || aggr_expr.as_any().is::<LastValue>()
+ || aggr_expr.as_any().is::<ArrayAgg>()
+}
+
impl AggregateExec {
/// Create a new hash aggregate execution plan
pub fn try_new(
@@ -395,7 +406,7 @@ impl AggregateExec {
group_by: PhysicalGroupBy,
aggr_expr: Vec<Arc<dyn AggregateExpr>>,
filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
- order_by_expr: Vec<Option<Vec<PhysicalSortExpr>>>,
+ mut order_by_expr: Vec<Option<Vec<PhysicalSortExpr>>>,
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
) -> Result<Self> {
@@ -413,6 +424,18 @@ impl AggregateExec {
// In other modes, all groups are collapsed, therefore their input schema
// can not contain expressions in the requirement.
if mode == AggregateMode::Partial || mode == AggregateMode::Single {
+ order_by_expr = aggr_expr
+ .iter()
+ .zip(order_by_expr.into_iter())
+ .map(|(aggr_expr, fn_reqs)| {
+ // If aggregation function is ordering sensitive, keep ordering requirement as is; otherwise ignore requirement
+ if is_order_sensitive(aggr_expr) {
+ fn_reqs
+ } else {
+ None
+ }
+ })
+ .collect::<Vec<_>>();
let requirement = get_finest_requirement(
&order_by_expr,
|| input.equivalence_properties(),
diff --git a/datafusion/core/tests/dataframe_functions.rs b/datafusion/core/tests/dataframe_functions.rs
index 23af19983d..88b6114309 100644
--- a/datafusion/core/tests/dataframe_functions.rs
+++ b/datafusion/core/tests/dataframe_functions.rs
@@ -155,11 +155,11 @@ async fn test_fn_approx_median() -> Result<()> {
let expr = approx_median(col("b"));
let expected = vec![
- "+----------------------+",
- "| APPROXMEDIAN(test.b) |",
- "+----------------------+",
- "| 10 |",
- "+----------------------+",
+ "+-----------------------+",
+ "| APPROX_MEDIAN(test.b) |",
+ "+-----------------------+",
+ "| 10 |",
+ "+-----------------------+",
];
let df = create_test_table().await?;
@@ -175,11 +175,11 @@ async fn test_fn_approx_percentile_cont() -> Result<()> {
let expr = approx_percentile_cont(col("b"), lit(0.5));
let expected = vec![
- "+-------------------------------------------+",
- "| APPROXPERCENTILECONT(test.b,Float64(0.5)) |",
- "+-------------------------------------------+",
- "| 10 |",
- "+-------------------------------------------+",
+ "+---------------------------------------------+",
+ "| APPROX_PERCENTILE_CONT(test.b,Float64(0.5)) |",
+ "+---------------------------------------------+",
+ "| 10 |",
+ "+---------------------------------------------+",
];
let df = create_test_table().await?;
diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs
index e847ea0c0e..3ff81581c0 100644
--- a/datafusion/core/tests/sql/aggregates.rs
+++ b/datafusion/core/tests/sql/aggregates.rs
@@ -29,7 +29,7 @@ async fn csv_query_array_agg_distinct() -> Result<()> {
// The results for this query should be something like the following:
// +------------------------------------------+
- // | ARRAYAGG(DISTINCT aggregate_test_100.c2) |
+ // | ARRAY_AGG(DISTINCT aggregate_test_100.c2) |
// +------------------------------------------+
// | [4, 2, 3, 5, 1] |
// +------------------------------------------+
@@ -37,7 +37,7 @@ async fn csv_query_array_agg_distinct() -> Result<()> {
assert_eq!(
*actual[0].schema(),
Schema::new(vec![Field::new_list(
- "ARRAYAGG(DISTINCT aggregate_test_100.c2)",
+ "ARRAY_AGG(DISTINCT aggregate_test_100.c2)",
Field::new("item", DataType::UInt32, true),
false
),])
diff --git a/datafusion/core/tests/sqllogictests/test_files/aggregate.slt b/datafusion/core/tests/sqllogictests/test_files/aggregate.slt
index 17d89a9f05..ab3516e9e5 100644
--- a/datafusion/core/tests/sqllogictests/test_files/aggregate.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/aggregate.slt
@@ -41,7 +41,7 @@ LOCATION '../../testing/data/csv/aggregate_test_100.csv'
#######
# https://github.com/apache/arrow-datafusion/issues/3353
-statement error DataFusion error: Schema error: Schema contains duplicate unqualified field name "APPROXDISTINCT\(aggregate_test_100\.c9\)"
+statement error DataFusion error: Schema error: Schema contains duplicate unqualified field name "APPROX_DISTINCT\(aggregate_test_100\.c9\)"
SELECT approx_distinct(c9) count_c9, approx_distinct(cast(c9 as varchar)) count_c9_str FROM aggregate_test_100
# csv_query_approx_percentile_cont_with_weight
diff --git a/datafusion/core/tests/sqllogictests/test_files/groupby.slt b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
index b81731e64c..8bac60bbba 100644
--- a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
@@ -1974,25 +1974,26 @@ query III
# test_source_sorted_groupby2
-
+# If ordering is not important for the aggregation function, we should ignore the ordering requirement. Hence
+# "ORDER BY a DESC" should have no effect.
query TT
EXPLAIN SELECT a, d,
- SUM(c) as summation1
+ SUM(c ORDER BY a DESC) as summation1
FROM annotated_data_infinite2
GROUP BY d, a
----
logical_plan
-Projection: annotated_data_infinite2.a, annotated_data_infinite2.d, SUM(annotated_data_infinite2.c) AS summation1
---Aggregate: groupBy=[[annotated_data_infinite2.d, annotated_data_infinite2.a]], aggr=[[SUM(annotated_data_infinite2.c)]]
+Projection: annotated_data_infinite2.a, annotated_data_infinite2.d, SUM(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST] AS summation1
+--Aggregate: groupBy=[[annotated_data_infinite2.d, annotated_data_infinite2.a]], aggr=[[SUM(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]]]
----TableScan: annotated_data_infinite2 projection=[a, c, d]
physical_plan
-ProjectionExec: expr=[a@1 as a, d@0 as d, SUM(annotated_data_infinite2.c)@2 as summation1]
+ProjectionExec: expr=[a@1 as a, d@0 as d, SUM(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as summation1]
--AggregateExec: mode=Single, gby=[d@2 as d, a@0 as a], aggr=[SUM(annotated_data_infinite2.c)], ordering_mode=PartiallyOrdered
----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true
query III
SELECT a, d,
- SUM(c) as summation1
+ SUM(c ORDER BY a DESC) as summation1
FROM annotated_data_infinite2
GROUP BY d, a
----
@@ -2007,6 +2008,85 @@ SELECT a, d,
1 4 913
1 2 848
+# test_source_sorted_groupby3
+
+query TT
+EXPLAIN SELECT a, b, FIRST_VALUE(c ORDER BY a DESC) as first_c
+ FROM annotated_data_infinite2
+ GROUP BY a, b
+----
+logical_plan
+Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, FIRST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST] AS first_c
+--Aggregate: groupBy=[[annotated_data_infinite2.a, annotated_data_infinite2.b]], aggr=[[FIRST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]]]
+----TableScan: annotated_data_infinite2 projection=[a, b, c]
+physical_plan
+ProjectionExec: expr=[a@0 as a, b@1 as b, FIRST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as first_c]
+--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
+----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+
+query III
+SELECT a, b, FIRST_VALUE(c ORDER BY a DESC) as first_c
+ FROM annotated_data_infinite2
+ GROUP BY a, b
+----
+0 0 0
+0 1 25
+1 2 50
+1 3 75
+
+# test_source_sorted_groupby4
+
+query TT
+EXPLAIN SELECT a, b, LAST_VALUE(c ORDER BY a DESC) as last_c
+ FROM annotated_data_infinite2
+ GROUP BY a, b
+----
+logical_plan
+Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, LAST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST] AS last_c
+--Aggregate: groupBy=[[annotated_data_infinite2.a, annotated_data_infinite2.b]], aggr=[[LAST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]]]
+----TableScan: annotated_data_infinite2 projection=[a, b, c]
+physical_plan
+ProjectionExec: expr=[a@0 as a, b@1 as b, LAST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as last_c]
+--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
+----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+
+query III
+SELECT a, b, LAST_VALUE(c ORDER BY a DESC) as last_c
+ FROM annotated_data_infinite2
+ GROUP BY a, b
+----
+0 0 24
+0 1 49
+1 2 74
+1 3 99
+
+# when LAST_VALUE, or FIRST_VALUE value do not contain ordering requirement
+# queries should still work, However, result depends on the scanning order and
+# not deterministic
+query TT
+EXPLAIN SELECT a, b, LAST_VALUE(c) as last_c
+ FROM annotated_data_infinite2
+ GROUP BY a, b
+----
+logical_plan
+Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, LAST_VALUE(annotated_data_infinite2.c) AS last_c
+--Aggregate: groupBy=[[annotated_data_infinite2.a, annotated_data_infinite2.b]], aggr=[[LAST_VALUE(annotated_data_infinite2.c)]]
+----TableScan: annotated_data_infinite2 projection=[a, b, c]
+physical_plan
+ProjectionExec: expr=[a@0 as a, b@1 as b, LAST_VALUE(annotated_data_infinite2.c)@2 as last_c]
+--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
+----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+
+query III
+SELECT a, b, LAST_VALUE(c) as last_c
+ FROM annotated_data_infinite2
+ GROUP BY a, b
+----
+0 0 24
+0 1 49
+1 2 74
+1 3 99
+
statement ok
drop table annotated_data_infinite2;
@@ -2038,12 +2118,12 @@ EXPLAIN SELECT country, (ARRAY_AGG(amount ORDER BY amount ASC)) AS amounts
GROUP BY country
----
logical_plan
-Projection: sales_global.country, ARRAYAGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS amounts
---Aggregate: groupBy=[[sales_global.country]], aggr=[[ARRAYAGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]]]
+Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS amounts
+--Aggregate: groupBy=[[sales_global.country]], aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]]]
----TableScan: sales_global projection=[country, amount]
physical_plan
-ProjectionExec: expr=[country@0 as country, ARRAYAGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts]
---AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAYAGG(sales_global.amount)]
+ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts]
+--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount)]
----SortExec: expr=[amount@1 ASC NULLS LAST]
------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2067,13 +2147,13 @@ EXPLAIN SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.amount DESC) AS amounts,
GROUP BY s.country
----
logical_plan
-Projection: s.country, ARRAYAGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST] AS amounts, SUM(s.amount) AS sum1
---Aggregate: groupBy=[[s.country]], aggr=[[ARRAYAGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], SUM(s.amount)]]
+Projection: s.country, ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST] AS amounts, SUM(s.amount) AS sum1
+--Aggregate: groupBy=[[s.country]], aggr=[[ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], SUM(s.amount)]]
----SubqueryAlias: s
------TableScan: sales_global projection=[country, amount]
physical_plan
-ProjectionExec: expr=[country@0 as country, ARRAYAGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST]@1 as amounts, SUM(s.amount)@2 as sum1]
---AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAYAGG(s.amount), SUM(s.amount)]
+ProjectionExec: expr=[country@0 as country, ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST]@1 as amounts, SUM(s.amount)@2 as sum1]
+--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(s.amount), SUM(s.amount)]
----SortExec: expr=[amount@1 DESC]
------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2120,14 +2200,14 @@ EXPLAIN SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.amount DESC) AS amounts,
GROUP BY s.country
----
logical_plan
-Projection: s.country, ARRAYAGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST] AS amounts, SUM(s.amount) AS sum1
---Aggregate: groupBy=[[s.country]], aggr=[[ARRAYAGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], SUM(s.amount)]]
+Projection: s.country, ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST] AS amounts, SUM(s.amount) AS sum1
+--Aggregate: groupBy=[[s.country]], aggr=[[ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], SUM(s.amount)]]
----SubqueryAlias: s
------Sort: sales_global.country ASC NULLS LAST
--------TableScan: sales_global projection=[country, amount]
physical_plan
-ProjectionExec: expr=[country@0 as country, ARRAYAGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST]@1 as amounts, SUM(s.amount)@2 as sum1]
---AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAYAGG(s.amount), SUM(s.amount)], ordering_mode=FullyOrdered
+ProjectionExec: expr=[country@0 as country, ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST]@1 as amounts, SUM(s.amount)@2 as sum1]
+--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(s.amount), SUM(s.amount)], ordering_mode=FullyOrdered
----SortExec: expr=[country@0 ASC NULLS LAST,amount@1 DESC]
------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2156,14 +2236,14 @@ EXPLAIN SELECT s.country, s.zip_code, ARRAY_AGG(s.amount ORDER BY s.amount DESC)
GROUP BY s.country, s.zip_code
----
logical_plan
-Projection: s.country, s.zip_code, ARRAYAGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST] AS amounts, SUM(s.amount) AS sum1
---Aggregate: groupBy=[[s.country, s.zip_code]], aggr=[[ARRAYAGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], SUM(s.amount)]]
+Projection: s.country, s.zip_code, ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST] AS amounts, SUM(s.amount) AS sum1
+--Aggregate: groupBy=[[s.country, s.zip_code]], aggr=[[ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], SUM(s.amount)]]
----SubqueryAlias: s
------Sort: sales_global.country ASC NULLS LAST
--------TableScan: sales_global projection=[zip_code, country, amount]
physical_plan
-ProjectionExec: expr=[country@0 as country, zip_code@1 as zip_code, ARRAYAGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST]@2 as amounts, SUM(s.amount)@3 as sum1]
---AggregateExec: mode=Single, gby=[country@1 as country, zip_code@0 as zip_code], aggr=[ARRAYAGG(s.amount), SUM(s.amount)], ordering_mode=PartiallyOrdered
+ProjectionExec: expr=[country@0 as country, zip_code@1 as zip_code, ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST]@2 as amounts, SUM(s.amount)@3 as sum1]
+--AggregateExec: mode=Single, gby=[country@1 as country, zip_code@0 as zip_code], aggr=[ARRAY_AGG(s.amount), SUM(s.amount)], ordering_mode=PartiallyOrdered
----SortExec: expr=[country@1 ASC NULLS LAST,amount@2 DESC]
------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2192,14 +2272,14 @@ EXPLAIN SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.country DESC) AS amounts
GROUP BY s.country
----
logical_plan
-Projection: s.country, ARRAYAGG(s.amount) ORDER BY [s.country DESC NULLS FIRST] AS amounts, SUM(s.amount) AS sum1
---Aggregate: groupBy=[[s.country]], aggr=[[ARRAYAGG(s.amount) ORDER BY [s.country DESC NULLS FIRST], SUM(s.amount)]]
+Projection: s.country, ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS FIRST] AS amounts, SUM(s.amount) AS sum1
+--Aggregate: groupBy=[[s.country]], aggr=[[ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS FIRST], SUM(s.amount)]]
----SubqueryAlias: s
------Sort: sales_global.country ASC NULLS LAST
--------TableScan: sales_global projection=[country, amount]
physical_plan
-ProjectionExec: expr=[country@0 as country, ARRAYAGG(s.amount) ORDER BY [s.country DESC NULLS FIRST]@1 as amounts, SUM(s.amount)@2 as sum1]
---AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAYAGG(s.amount), SUM(s.amount)], ordering_mode=FullyOrdered
+ProjectionExec: expr=[country@0 as country, ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS FIRST]@1 as amounts, SUM(s.amount)@2 as sum1]
+--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(s.amount), SUM(s.amount)], ordering_mode=FullyOrdered
----SortExec: expr=[country@0 ASC NULLS LAST]
------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2227,14 +2307,14 @@ EXPLAIN SELECT s.country, ARRAY_AGG(s.amount ORDER BY s.country DESC, s.amount D
GROUP BY s.country
----
logical_plan
-Projection: s.country, ARRAYAGG(s.amount) ORDER BY [s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST] AS amounts, SUM(s.amount) AS sum1
---Aggregate: groupBy=[[s.country]], aggr=[[ARRAYAGG(s.amount) ORDER BY [s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST], SUM(s.amount)]]
+Projection: s.country, ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST] AS amounts, SUM(s.amount) AS sum1
+--Aggregate: groupBy=[[s.country]], aggr=[[ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST], SUM(s.amount)]]
----SubqueryAlias: s
------Sort: sales_global.country ASC NULLS LAST
--------TableScan: sales_global projection=[country, amount]
physical_plan
-ProjectionExec: expr=[country@0 as country, ARRAYAGG(s.amount) ORDER BY [s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST]@1 as amounts, SUM(s.amount)@2 as sum1]
---AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAYAGG(s.amount), SUM(s.amount)], ordering_mode=FullyOrdered
+ProjectionExec: expr=[country@0 as country, ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST]@1 as amounts, SUM(s.amount)@2 as sum1]
+--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(s.amount), SUM(s.amount)], ordering_mode=FullyOrdered
----SortExec: expr=[country@0 ASC NULLS LAST,amount@1 DESC]
------MemoryExec: partitions=1, partition_sizes=[1]
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt b/datafusion/core/tests/sqllogictests/test_files/window.slt
index 32f45dbb57..8ab9b29da4 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -2007,16 +2007,16 @@ query TT
EXPLAIN SELECT ARRAY_AGG(c13) as array_agg1 FROM (SELECT * FROM aggregate_test_100 ORDER BY c13 LIMIT 1)
----
logical_plan
-Projection: ARRAYAGG(aggregate_test_100.c13) AS array_agg1
---Aggregate: groupBy=[[]], aggr=[[ARRAYAGG(aggregate_test_100.c13)]]
+Projection: ARRAY_AGG(aggregate_test_100.c13) AS array_agg1
+--Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(aggregate_test_100.c13)]]
----Limit: skip=0, fetch=1
------Sort: aggregate_test_100.c13 ASC NULLS LAST, fetch=1
--------TableScan: aggregate_test_100 projection=[c13]
physical_plan
-ProjectionExec: expr=[ARRAYAGG(aggregate_test_100.c13)@0 as array_agg1]
---AggregateExec: mode=Final, gby=[], aggr=[ARRAYAGG(aggregate_test_100.c13)]
+ProjectionExec: expr=[ARRAY_AGG(aggregate_test_100.c13)@0 as array_agg1]
+--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(aggregate_test_100.c13)]
----CoalescePartitionsExec
-------AggregateExec: mode=Partial, gby=[], aggr=[ARRAYAGG(aggregate_test_100.c13)]
+------AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(aggregate_test_100.c13)]
--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----------GlobalLimitExec: skip=0, fetch=1
------------SortExec: fetch=1, expr=[c13@0 ASC NULLS LAST]
@@ -3017,6 +3017,19 @@ SELECT a, b, c,
0 0 3 11 96 11 2 10 36 10 36 11 5 11 9
0 0 4 9 72 9 NULL 14 45 14 45 9 4 9 9
+#fn aggregate order by with window frame
+# In window expressions, aggregate functions should not have an ordering requirement, such requirements
+# should be defined in the window frame. Therefore, the query below should generate an error. Note that
+# PostgreSQL also behaves this way.
+statement error DataFusion error: Error during planning: Aggregate ORDER BY is not implemented for window functions
+SELECT SUM(b ORDER BY a ASC) OVER() as sum1
+ FROM annotated_data_infinite2
+
+# Even if, requirement of window clause and aggregate function match;
+# we should raise an error, when an ordering requirement is given to aggregate functions in window clauses.
+statement error DataFusion error: Error during planning: Aggregate ORDER BY is not implemented for window functions
+EXPLAIN SELECT a, b, LAST_VALUE(c ORDER BY a ASC) OVER (order by a ASC) as last_c
+ FROM annotated_data_infinite2
statement ok
drop table annotated_data_finite2
@@ -3135,4 +3148,4 @@ SELECT
WINDOW window1 AS (ORDER BY C12),
window1 AS (ORDER BY C3)
ORDER BY C3
- LIMIT 5
\ No newline at end of file
+ LIMIT 5
diff --git a/datafusion/expr/src/aggregate_function.rs b/datafusion/expr/src/aggregate_function.rs
index 7d5fa277de..8258c8b805 100644
--- a/datafusion/expr/src/aggregate_function.rs
+++ b/datafusion/expr/src/aggregate_function.rs
@@ -42,6 +42,10 @@ pub enum AggregateFunction {
ApproxDistinct,
/// array_agg
ArrayAgg,
+ /// first_value
+ FirstValue,
+ /// last_value
+ LastValue,
/// Variance (Sample)
Variance,
/// Variance (Population)
@@ -76,10 +80,43 @@ pub enum AggregateFunction {
BoolOr,
}
+impl AggregateFunction {
+ fn name(&self) -> &str {
+ use AggregateFunction::*;
+ match self {
+ Count => "COUNT",
+ Sum => "SUM",
+ Min => "MIN",
+ Max => "MAX",
+ Avg => "AVG",
+ Median => "MEDIAN",
+ ApproxDistinct => "APPROX_DISTINCT",
+ ArrayAgg => "ARRAY_AGG",
+ FirstValue => "FIRST_VALUE",
+ LastValue => "LAST_VALUE",
+ Variance => "VARIANCE",
+ VariancePop => "VARIANCE_POP",
+ Stddev => "STDDEV",
+ StddevPop => "STDDEV_POP",
+ Covariance => "COVARIANCE",
+ CovariancePop => "COVARIANCE_POP",
+ Correlation => "CORRELATION",
+ ApproxPercentileCont => "APPROX_PERCENTILE_CONT",
+ ApproxPercentileContWithWeight => "APPROX_PERCENTILE_CONT_WITH_WEIGHT",
+ ApproxMedian => "APPROX_MEDIAN",
+ Grouping => "GROUPING",
+ BitAnd => "BIT_AND",
+ BitOr => "BIT_OR",
+ BitXor => "BIT_XOR",
+ BoolAnd => "BOOL_AND",
+ BoolOr => "BOOL_OR",
+ }
+ }
+}
+
impl fmt::Display for AggregateFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- // uppercase of the debug.
- write!(f, "{}", format!("{self:?}").to_uppercase())
+ write!(f, "{}", self.name())
}
}
@@ -101,6 +138,8 @@ impl FromStr for AggregateFunction {
"min" => AggregateFunction::Min,
"sum" => AggregateFunction::Sum,
"array_agg" => AggregateFunction::ArrayAgg,
+ "first_value" => AggregateFunction::FirstValue,
+ "last_value" => AggregateFunction::LastValue,
// statistical
"corr" => AggregateFunction::Correlation,
"covar" => AggregateFunction::Covariance,
@@ -182,6 +221,9 @@ pub fn return_type(
Ok(coerced_data_types[0].clone())
}
AggregateFunction::Grouping => Ok(DataType::Int32),
+ AggregateFunction::FirstValue | AggregateFunction::LastValue => {
+ Ok(coerced_data_types[0].clone())
+ }
}
}
@@ -232,7 +274,9 @@ pub fn signature(fun: &AggregateFunction) -> Signature {
| AggregateFunction::Stddev
| AggregateFunction::StddevPop
| AggregateFunction::Median
- | AggregateFunction::ApproxMedian => {
+ | AggregateFunction::ApproxMedian
+ | AggregateFunction::FirstValue
+ | AggregateFunction::LastValue => {
Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable)
}
AggregateFunction::Covariance | AggregateFunction::CovariancePop => {
diff --git a/datafusion/expr/src/type_coercion/aggregates.rs b/datafusion/expr/src/type_coercion/aggregates.rs
index 2cc6c322e6..4f02bf3dfd 100644
--- a/datafusion/expr/src/type_coercion/aggregates.rs
+++ b/datafusion/expr/src/type_coercion/aggregates.rs
@@ -264,7 +264,9 @@ pub fn coerce_types(
}
Ok(input_types.to_vec())
}
- AggregateFunction::Median => Ok(input_types.to_vec()),
+ AggregateFunction::Median
+ | AggregateFunction::FirstValue
+ | AggregateFunction::LastValue => Ok(input_types.to_vec()),
AggregateFunction::Grouping => Ok(vec![input_types[0].clone()]),
}
}
diff --git a/datafusion/expr/src/window_function.rs b/datafusion/expr/src/window_function.rs
index 2d91dca8cc..1bae3a162e 100644
--- a/datafusion/expr/src/window_function.rs
+++ b/datafusion/expr/src/window_function.rs
@@ -42,10 +42,15 @@ pub enum WindowFunction {
/// Find DataFusion's built-in window function by name.
pub fn find_df_window_func(name: &str) -> Option<WindowFunction> {
let name = name.to_lowercase();
- if let Ok(aggregate) = AggregateFunction::from_str(name.as_str()) {
- Some(WindowFunction::AggregateFunction(aggregate))
- } else if let Ok(built_in_function) = BuiltInWindowFunction::from_str(name.as_str()) {
+ // Code paths for window functions leveraging ordinary aggregators and
+ // built-in window functions are quite different, and the same function
+ // may have different implementations for these cases. If the sought
+ // function is not found among built-in window functions, we search for
+ // it among aggregate functions.
+ if let Ok(built_in_function) = BuiltInWindowFunction::from_str(name.as_str()) {
Some(WindowFunction::BuiltInWindowFunction(built_in_function))
+ } else if let Ok(aggregate) = AggregateFunction::from_str(name.as_str()) {
+ Some(WindowFunction::AggregateFunction(aggregate))
} else {
None
}
@@ -53,19 +58,7 @@ pub fn find_df_window_func(name: &str) -> Option<WindowFunction> {
impl fmt::Display for BuiltInWindowFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- match self {
- BuiltInWindowFunction::RowNumber => write!(f, "ROW_NUMBER"),
- BuiltInWindowFunction::Rank => write!(f, "RANK"),
- BuiltInWindowFunction::DenseRank => write!(f, "DENSE_RANK"),
- BuiltInWindowFunction::PercentRank => write!(f, "PERCENT_RANK"),
- BuiltInWindowFunction::CumeDist => write!(f, "CUME_DIST"),
- BuiltInWindowFunction::Ntile => write!(f, "NTILE"),
- BuiltInWindowFunction::Lag => write!(f, "LAG"),
- BuiltInWindowFunction::Lead => write!(f, "LEAD"),
- BuiltInWindowFunction::FirstValue => write!(f, "FIRST_VALUE"),
- BuiltInWindowFunction::LastValue => write!(f, "LAST_VALUE"),
- BuiltInWindowFunction::NthValue => write!(f, "NTH_VALUE"),
- }
+ write!(f, "{}", self.name())
}
}
@@ -112,6 +105,25 @@ pub enum BuiltInWindowFunction {
NthValue,
}
+impl BuiltInWindowFunction {
+ fn name(&self) -> &str {
+ use BuiltInWindowFunction::*;
+ match self {
+ RowNumber => "ROW_NUMBER",
+ Rank => "RANK",
+ DenseRank => "DENSE_RANK",
+ PercentRank => "PERCENT_RANK",
+ CumeDist => "CUME_DIST",
+ Ntile => "NTILE",
+ Lag => "LAG",
+ Lead => "LEAD",
+ FirstValue => "FIRST_VALUE",
+ LastValue => "LAST_VALUE",
+ NthValue => "NTH_VALUE",
+ }
+ }
+}
+
impl FromStr for BuiltInWindowFunction {
type Err = DataFusionError;
fn from_str(name: &str) -> Result<BuiltInWindowFunction> {
diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs b/datafusion/physical-expr/src/aggregate/build_in.rs
index 2410f0147e..69ff89a392 100644
--- a/datafusion/physical-expr/src/aggregate/build_in.rs
+++ b/datafusion/physical-expr/src/aggregate/build_in.rs
@@ -308,6 +308,16 @@ pub fn create_aggregate_expr(
"MEDIAN(DISTINCT) aggregations are not available".to_string(),
));
}
+ (AggregateFunction::FirstValue, _) => Arc::new(expressions::FirstValue::new(
+ input_phy_exprs[0].clone(),
+ name,
+ input_phy_types[0].clone(),
+ )),
+ (AggregateFunction::LastValue, _) => Arc::new(expressions::LastValue::new(
+ input_phy_exprs[0].clone(),
+ name,
+ input_phy_types[0].clone(),
+ )),
})
}
diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs b/datafusion/physical-expr/src/aggregate/first_last.rs
new file mode 100644
index 0000000000..f65360c751
--- /dev/null
+++ b/datafusion/physical-expr/src/aggregate/first_last.rs
@@ -0,0 +1,272 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the FIRST_VALUE/LAST_VALUE aggregations.
+
+use crate::aggregate::utils::down_cast_any_ref;
+use crate::expressions::format_state_name;
+use crate::{AggregateExpr, PhysicalExpr};
+
+use arrow::array::ArrayRef;
+use arrow::datatypes::{DataType, Field};
+use arrow_array::Array;
+use datafusion_common::{Result, ScalarValue};
+use datafusion_expr::Accumulator;
+
+use std::any::Any;
+use std::sync::Arc;
+
+/// FIRST_VALUE aggregate expression
+#[derive(Debug)]
+pub struct FirstValue {
+ name: String,
+ pub data_type: DataType,
+ expr: Arc<dyn PhysicalExpr>,
+}
+
+impl FirstValue {
+ /// Creates a new FIRST_VALUE aggregation function.
+ pub fn new(
+ expr: Arc<dyn PhysicalExpr>,
+ name: impl Into<String>,
+ data_type: DataType,
+ ) -> Self {
+ Self {
+ name: name.into(),
+ data_type,
+ expr,
+ }
+ }
+}
+
+impl AggregateExpr for FirstValue {
+ /// Return a reference to Any that can be used for downcasting
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn field(&self) -> Result<Field> {
+ Ok(Field::new(&self.name, self.data_type.clone(), true))
+ }
+
+ fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+ Ok(Box::new(FirstValueAccumulator::try_new(&self.data_type)?))
+ }
+
+ fn state_fields(&self) -> Result<Vec<Field>> {
+ Ok(vec![Field::new(
+ format_state_name(&self.name, "first_value"),
+ self.data_type.clone(),
+ true,
+ )])
+ }
+
+ fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+ vec![self.expr.clone()]
+ }
+
+ fn name(&self) -> &str {
+ &self.name
+ }
+
+ fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
+ Some(Arc::new(LastValue::new(
+ self.expr.clone(),
+ self.name.clone(),
+ self.data_type.clone(),
+ )))
+ }
+
+ fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+ Ok(Box::new(FirstValueAccumulator::try_new(&self.data_type)?))
+ }
+}
+
+impl PartialEq<dyn Any> for FirstValue {
+ fn eq(&self, other: &dyn Any) -> bool {
+ down_cast_any_ref(other)
+ .downcast_ref::<Self>()
+ .map(|x| {
+ self.name == x.name
+ && self.data_type == x.data_type
+ && self.expr.eq(&x.expr)
+ })
+ .unwrap_or(false)
+ }
+}
+
+#[derive(Debug)]
+struct FirstValueAccumulator {
+ first: ScalarValue,
+}
+
+impl FirstValueAccumulator {
+ /// Creates a new `FirstValueAccumulator` for the given `data_type`.
+ pub fn try_new(data_type: &DataType) -> Result<Self> {
+ ScalarValue::try_from(data_type).map(|value| Self { first: value })
+ }
+}
+
+impl Accumulator for FirstValueAccumulator {
+ fn state(&self) -> Result<Vec<ScalarValue>> {
+ Ok(vec![self.first.clone()])
+ }
+
+ fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+ // If we have seen first value, we shouldn't update it
+ let values = &values[0];
+ if !values.is_empty() {
+ self.first = ScalarValue::try_from_array(values, 0)?;
+ }
+ Ok(())
+ }
+
+ fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+ // FIRST_VALUE(first1, first2, first3, ...)
+ self.update_batch(states)
+ }
+
+ fn evaluate(&self) -> Result<ScalarValue> {
+ Ok(self.first.clone())
+ }
+
+ fn size(&self) -> usize {
+ std::mem::size_of_val(self) - std::mem::size_of_val(&self.first)
+ + self.first.size()
+ }
+}
+
+/// LAST_VALUE aggregate expression
+#[derive(Debug)]
+pub struct LastValue {
+ name: String,
+ pub data_type: DataType,
+ expr: Arc<dyn PhysicalExpr>,
+}
+
+impl LastValue {
+ /// Creates a new LAST_VALUE aggregation function.
+ pub fn new(
+ expr: Arc<dyn PhysicalExpr>,
+ name: impl Into<String>,
+ data_type: DataType,
+ ) -> Self {
+ Self {
+ name: name.into(),
+ data_type,
+ expr,
+ }
+ }
+}
+
+impl AggregateExpr for LastValue {
+ /// Return a reference to Any that can be used for downcasting
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn field(&self) -> Result<Field> {
+ Ok(Field::new(&self.name, self.data_type.clone(), true))
+ }
+
+ fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+ Ok(Box::new(LastValueAccumulator::try_new(&self.data_type)?))
+ }
+
+ fn state_fields(&self) -> Result<Vec<Field>> {
+ Ok(vec![Field::new(
+ format_state_name(&self.name, "last_value"),
+ self.data_type.clone(),
+ true,
+ )])
+ }
+
+ fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+ vec![self.expr.clone()]
+ }
+
+ fn name(&self) -> &str {
+ &self.name
+ }
+
+ fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
+ Some(Arc::new(FirstValue::new(
+ self.expr.clone(),
+ self.name.clone(),
+ self.data_type.clone(),
+ )))
+ }
+
+ fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+ Ok(Box::new(LastValueAccumulator::try_new(&self.data_type)?))
+ }
+}
+
+impl PartialEq<dyn Any> for LastValue {
+ fn eq(&self, other: &dyn Any) -> bool {
+ down_cast_any_ref(other)
+ .downcast_ref::<Self>()
+ .map(|x| {
+ self.name == x.name
+ && self.data_type == x.data_type
+ && self.expr.eq(&x.expr)
+ })
+ .unwrap_or(false)
+ }
+}
+
+#[derive(Debug)]
+struct LastValueAccumulator {
+ last: ScalarValue,
+}
+
+impl LastValueAccumulator {
+ /// Creates a new `LastValueAccumulator` for the given `data_type`.
+ pub fn try_new(data_type: &DataType) -> Result<Self> {
+ Ok(Self {
+ last: ScalarValue::try_from(data_type)?,
+ })
+ }
+}
+
+impl Accumulator for LastValueAccumulator {
+ fn state(&self) -> Result<Vec<ScalarValue>> {
+ Ok(vec![self.last.clone()])
+ }
+
+ fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+ let values = &values[0];
+ if !values.is_empty() {
+ // Update with last value in the array.
+ self.last = ScalarValue::try_from_array(values, values.len() - 1)?;
+ }
+ Ok(())
+ }
+
+ fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+ // LAST_VALUE(last1, last2, last3, ...)
+ self.update_batch(states)
+ }
+
+ fn evaluate(&self) -> Result<ScalarValue> {
+ Ok(self.last.clone())
+ }
+
+ fn size(&self) -> usize {
+ std::mem::size_of_val(self) - std::mem::size_of_val(&self.last) + self.last.size()
+ }
+}
diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs
index 34302c5aaf..8da635cfb2 100644
--- a/datafusion/physical-expr/src/aggregate/mod.rs
+++ b/datafusion/physical-expr/src/aggregate/mod.rs
@@ -37,6 +37,7 @@ pub(crate) mod correlation;
pub(crate) mod count;
pub(crate) mod count_distinct;
pub(crate) mod covariance;
+pub(crate) mod first_last;
pub(crate) mod grouping;
pub(crate) mod median;
#[macro_use]
diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs
index e65d17fa28..66d593c5ca 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -54,6 +54,7 @@ pub use crate::aggregate::correlation::Correlation;
pub use crate::aggregate::count::Count;
pub use crate::aggregate::count_distinct::DistinctCount;
pub use crate::aggregate::covariance::{Covariance, CovariancePop};
+pub use crate::aggregate::first_last::{FirstValue, LastValue};
pub use crate::aggregate::grouping::Grouping;
pub use crate::aggregate::median::Median;
pub use crate::aggregate::min_max::{Max, Min};
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index 446b4a0275..7c35452085 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -570,6 +570,10 @@ enum AggregateFunction {
BIT_XOR = 21;
BOOL_AND = 22;
BOOL_OR = 23;
+ // When a function with the same name exists among built-in window functions,
+ // we append "_AGG" to obey name scoping rules.
+ FIRST_VALUE_AGG = 24;
+ LAST_VALUE_AGG = 25;
}
message AggregateExprNode {
diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs
index 571fd45f8a..6dbe25c2d8 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -463,6 +463,8 @@ impl serde::Serialize for AggregateFunction {
Self::BitXor => "BIT_XOR",
Self::BoolAnd => "BOOL_AND",
Self::BoolOr => "BOOL_OR",
+ Self::FirstValueAgg => "FIRST_VALUE_AGG",
+ Self::LastValueAgg => "LAST_VALUE_AGG",
};
serializer.serialize_str(variant)
}
@@ -498,6 +500,8 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction {
"BIT_XOR",
"BOOL_AND",
"BOOL_OR",
+ "FIRST_VALUE_AGG",
+ "LAST_VALUE_AGG",
];
struct GeneratedVisitor;
@@ -564,6 +568,8 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction {
"BIT_XOR" => Ok(AggregateFunction::BitXor),
"BOOL_AND" => Ok(AggregateFunction::BoolAnd),
"BOOL_OR" => Ok(AggregateFunction::BoolOr),
+ "FIRST_VALUE_AGG" => Ok(AggregateFunction::FirstValueAgg),
+ "LAST_VALUE_AGG" => Ok(AggregateFunction::LastValueAgg),
_ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
}
}
diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs
index a5c0603239..7e48db10f3 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -2400,6 +2400,10 @@ pub enum AggregateFunction {
BitXor = 21,
BoolAnd = 22,
BoolOr = 23,
+ /// When a function with the same name exists among built-in window functions,
+ /// we append "_AGG" to obey name scoping rules.
+ FirstValueAgg = 24,
+ LastValueAgg = 25,
}
impl AggregateFunction {
/// String value of the enum field names used in the ProtoBuf definition.
@@ -2434,6 +2438,8 @@ impl AggregateFunction {
AggregateFunction::BitXor => "BIT_XOR",
AggregateFunction::BoolAnd => "BOOL_AND",
AggregateFunction::BoolOr => "BOOL_OR",
+ AggregateFunction::FirstValueAgg => "FIRST_VALUE_AGG",
+ AggregateFunction::LastValueAgg => "LAST_VALUE_AGG",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
@@ -2465,6 +2471,8 @@ impl AggregateFunction {
"BIT_XOR" => Some(Self::BitXor),
"BOOL_AND" => Some(Self::BoolAnd),
"BOOL_OR" => Some(Self::BoolOr),
+ "FIRST_VALUE_AGG" => Some(Self::FirstValueAgg),
+ "LAST_VALUE_AGG" => Some(Self::LastValueAgg),
_ => None,
}
}
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs
index b40f867d98..1150220bef 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -529,6 +529,8 @@ impl From<protobuf::AggregateFunction> for AggregateFunction {
protobuf::AggregateFunction::ApproxMedian => Self::ApproxMedian,
protobuf::AggregateFunction::Grouping => Self::Grouping,
protobuf::AggregateFunction::Median => Self::Median,
+ protobuf::AggregateFunction::FirstValueAgg => Self::FirstValue,
+ protobuf::AggregateFunction::LastValueAgg => Self::LastValue,
}
}
}
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs
index 06156c9f40..191c491944 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -388,6 +388,8 @@ impl From<&AggregateFunction> for protobuf::AggregateFunction {
AggregateFunction::ApproxMedian => Self::ApproxMedian,
AggregateFunction::Grouping => Self::Grouping,
AggregateFunction::Median => Self::Median,
+ AggregateFunction::FirstValue => Self::FirstValueAgg,
+ AggregateFunction::LastValue => Self::LastValueAgg,
}
}
}
@@ -667,6 +669,12 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
}
AggregateFunction::Grouping => protobuf::AggregateFunction::Grouping,
AggregateFunction::Median => protobuf::AggregateFunction::Median,
+ AggregateFunction::FirstValue => {
+ protobuf::AggregateFunction::FirstValueAgg
+ }
+ AggregateFunction::LastValue => {
+ protobuf::AggregateFunction::LastValueAgg
+ }
};
let aggregate_expr = protobuf::AggregateExprNode {
diff --git a/datafusion/sql/src/expr/function.rs b/datafusion/sql/src/expr/function.rs
index 0c5b460ead..70489203b2 100644
--- a/datafusion/sql/src/expr/function.rs
+++ b/datafusion/sql/src/expr/function.rs
@@ -53,6 +53,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
return Ok(Expr::ScalarFunction(ScalarFunction::new(fun, args)));
};
+ // If function is a window function (it has an OVER clause),
+ // it shouldn't have ordering requirement as function argument
+ // required ordering should be defined in OVER clause.
+ if !function.order_by.is_empty() && function.over.is_some() {
+ return Err(DataFusionError::Plan(
+ "Aggregate ORDER BY is not implemented for window functions".to_string(),
+ ));
+ }
+
// then, window function
if let Some(WindowType::WindowSpec(window)) = function.over.take() {
let partition_by = window
@@ -107,10 +116,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// next, aggregate built-ins
if let Ok(fun) = AggregateFunction::from_str(&name) {
let distinct = function.distinct;
+ let order_by =
+ self.order_by_to_sort_expr(&function.order_by, schema, planner_context)?;
+ let order_by = (!order_by.is_empty()).then_some(order_by);
let (fun, args) =
self.aggregate_fn_to_expr(fun, function.args, schema, planner_context)?;
return Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
- fun, args, distinct, None, None,
+ fun, args, distinct, None, order_by,
)));
};
diff --git a/datafusion/sql/tests/integration_test.rs b/datafusion/sql/tests/integration_test.rs
index a0d4d3d542..c8aaf060b8 100644
--- a/datafusion/sql/tests/integration_test.rs
+++ b/datafusion/sql/tests/integration_test.rs
@@ -1495,8 +1495,8 @@ fn select_count_column() {
#[test]
fn select_approx_median() {
let sql = "SELECT approx_median(age) FROM person";
- let expected = "Projection: APPROXMEDIAN(person.age)\
- \n Aggregate: groupBy=[[]], aggr=[[APPROXMEDIAN(person.age)]]\
+ let expected = "Projection: APPROX_MEDIAN(person.age)\
+ \n Aggregate: groupBy=[[]], aggr=[[APPROX_MEDIAN(person.age)]]\
\n TableScan: person";
quick_test(sql, expected);
}
@@ -2427,8 +2427,8 @@ fn approx_median_window() {
let sql =
"SELECT order_id, APPROX_MEDIAN(qty) OVER(PARTITION BY order_id) from orders";
let expected = "\
- Projection: orders.order_id, APPROXMEDIAN(orders.qty) PARTITION BY [orders.order_id] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING\
- \n WindowAggr: windowExpr=[[APPROXMEDIAN(orders.qty) PARTITION BY [orders.order_id] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\
+ Projection: orders.order_id, APPROX_MEDIAN(orders.qty) PARTITION BY [orders.order_id] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING\
+ \n WindowAggr: windowExpr=[[APPROX_MEDIAN(orders.qty) PARTITION BY [orders.order_id] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\
\n TableScan: orders";
quick_test(sql, expected);
}
diff --git a/docs/source/user-guide/sql/aggregate_functions.md b/docs/source/user-guide/sql/aggregate_functions.md
index d02c733efc..132ba47e24 100644
--- a/docs/source/user-guide/sql/aggregate_functions.md
+++ b/docs/source/user-guide/sql/aggregate_functions.md
@@ -36,6 +36,8 @@ Aggregate functions operate on a set of values to compute a single result.
- [min](#min)
- [sum](#sum)
- [array_agg](#array_agg)
+- [first_value](#first_value)
+- [last_value](#last_value)
### `avg`
@@ -202,6 +204,32 @@ array_agg(expression [ORDER BY expression])
#### Arguments
+- **expression**: Expression to operate on.
+ Can be a constant, column, or function, and any combination of arithmetic operators.
+
+### `first_value`
+
+Returns the first element in an aggregation group according to the requested ordering. If no ordering is given, returns an arbitrary element from the group.
+
+```
+first_value(expression [ORDER BY expression])
+```
+
+#### Arguments
+
+- **expression**: Expression to operate on.
+ Can be a constant, column, or function, and any combination of arithmetic operators.
+
+### `last_value`
+
+Returns the last element in an aggregation group according to the requested ordering. If no ordering is given, returns an arbitrary element from the group.
+
+```
+last_value(expression [ORDER BY expression])
+```
+
+#### Arguments
+
- **expression**: Expression to operate on.
Can be a constant, column, or function, and any combination of arithmetic operators.