You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "mustafasrepo (via GitHub)" <gi...@apache.org> on 2023/06/21 10:32:20 UTC

[GitHub] [arrow-datafusion] mustafasrepo opened a new pull request, #6734: Add support for order-sensitive aggregation for multipartitions

mustafasrepo opened a new pull request, #6734:
URL: https://github.com/apache/arrow-datafusion/pull/6734

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #.
   
   # Rationale for this change
   Order-sensitive aggregators (`FIRST_VALUE`, `LAST_VALUE`, `ARRAY_AGG`) previously was working only in single partition (`AggregateMode::Single` mode).  This behavior is limited. Current PR extends support to multi partitions.
   With the changes in this PR query below
   ```sql
   SELECT FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
     LAST_VALUE(amount ORDER BY ts DESC) AS fv2
     FROM sales_global
   ```
   can work both in single partition (`set datafusion.execution.target_partitions = 1`) and multipartition (set `datafusion.execution.target_partitions = 8`).
   
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   # What changes are included in this PR?
   To be able to support this feature, 
   - Order sensitive aggregators (in the `AggregateMode::Partial` mode) now includes ordering expressions in their output table so that subsequent Aggregate stages in the plan (`AggregateMode::Final` or `AggregateMode::FinalPartitioned`) can enforce their required ordering.  
   - A new variant of the `ARRAY_AGG` (`OrderSensitiveArrayAgg`) is introduced, since its implementation is quite different than the `ArrayAgg`.
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are these changes tested?
   Yes, new tests added to the `groupby.slt` file (approximately 300 lines of change) to test that new functionality works as expected. in multipartition.
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   # Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6734: Add support for order-sensitive aggregation for multipartitions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6734:
URL: https://github.com/apache/arrow-datafusion/pull/6734#discussion_r1238824885


##########
datafusion/physical-expr/src/aggregate/first_last.rs:
##########
@@ -117,42 +153,57 @@ impl PartialEq<dyn Any> for FirstValue {
 #[derive(Debug)]
 struct FirstValueAccumulator {
     first: ScalarValue,
-    // At the beginning, `is_set` is `false`, this means `first` is not seen yet.
-    // Once we see (`is_set=true`) first value, we do not update `first`.
+    // At the beginning, `is_set` is false, which means `first` is not seen yet.

Review Comment:
   I don't have a strong preference -- `is_set` is fine from my perspective



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6734: Add support for order-sensitive aggregation for multipartitions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6734:
URL: https://github.com/apache/arrow-datafusion/pull/6734#discussion_r1238698423


##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2076,18 +2076,18 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, FIRST_VALUE(
 ----TableScan: annotated_data_infinite2 projection=[a, b, c]
 physical_plan
 ProjectionExec: expr=[a@0 as a, b@1 as b, FIRST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as first_c]
---AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
+--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
 ----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
 
 query III
 SELECT a, b, FIRST_VALUE(c ORDER BY a DESC) as first_c
   FROM annotated_data_infinite2
   GROUP BY a, b
 ----
-0 0 0
-0 1 25
-1 2 50
-1 3 75
+0 0 24

Review Comment:
   I was just looking at the output (not the plan) -- I see now that the `ORDER BY` is on `a` but the value is `c` 
   
   Since the query groups by `a, b` each group that `FIRST_VALUE` is evaluated on, will have the same value of `a` and thus `FIRST_VALUE` is effectively arbitrary.
   
   When I printed out the values in `annotated_data_infinite2` it is clearer to me that the output of this query is "undefined" in the sense that any of the values of `c` are acceptable (I wonder if this test will therefore be unstable 🤔 ) . Maybe we can somehow make the query more representative for the future
   
   ```
   query III
   select a, b, c from annotated_data_infinite2 order by a, b, c;
   ----
   0 0 0
   0 0 1
   0 0 2
   0 0 3
   0 0 4
   0 0 5
   0 0 6
   0 0 7
   0 0 8
   0 0 9
   0 0 10
   0 0 11
   0 0 12
   0 0 13
   0 0 14
   0 0 15
   0 0 16
   0 0 17
   0 0 18
   0 0 19
   0 0 20
   0 0 21
   0 0 22
   0 0 23
   0 0 24
   0 1 25
   0 1 26
   0 1 27
   0 1 28
   0 1 29
   0 1 30
   0 1 31
   0 1 32
   0 1 33
   0 1 34
   0 1 35
   0 1 36
   0 1 37
   0 1 38
   0 1 39
   0 1 40
   0 1 41
   0 1 42
   0 1 43
   0 1 44
   0 1 45
   0 1 46
   0 1 47
   0 1 48
   0 1 49
   1 2 50
   1 2 51
   1 2 52
   1 2 53
   1 2 54
   1 2 55
   1 2 56
   1 2 57
   1 2 58
   1 2 59
   1 2 60
   1 2 61
   1 2 62
   1 2 63
   1 2 64
   1 2 65
   1 2 66
   1 2 67
   1 2 68
   1 2 69
   1 2 70
   1 2 71
   1 2 72
   1 2 73
   1 2 74
   1 3 75
   1 3 76
   1 3 77
   1 3 78
   1 3 79
   1 3 80
   1 3 81
   1 3 82
   1 3 83
   1 3 84
   1 3 85
   1 3 86
   1 3 87
   1 3 88
   1 3 89
   1 3 90
   1 3 91
   1 3 92
   1 3 93
   1 3 94
   1 3 95
   1 3 96
   1 3 97
   1 3 98
   1 3 99
   ``` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6734: Add support for order-sensitive aggregation for multipartitions

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6734:
URL: https://github.com/apache/arrow-datafusion/pull/6734#discussion_r1238658754


##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -81,6 +81,8 @@ pub enum AggregateMode {
 /// Group By expression modes
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub enum GroupByOrderMode {
+    /// None of the expressions in the GROUP BY clause have an ordering.
+    Linear,

Review Comment:
   Replaced it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6734: Add support for order-sensitive aggregation for multipartitions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6734:
URL: https://github.com/apache/arrow-datafusion/pull/6734#discussion_r1238698423


##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2076,18 +2076,18 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, FIRST_VALUE(
 ----TableScan: annotated_data_infinite2 projection=[a, b, c]
 physical_plan
 ProjectionExec: expr=[a@0 as a, b@1 as b, FIRST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as first_c]
---AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
+--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
 ----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
 
 query III
 SELECT a, b, FIRST_VALUE(c ORDER BY a DESC) as first_c
   FROM annotated_data_infinite2
   GROUP BY a, b
 ----
-0 0 0
-0 1 25
-1 2 50
-1 3 75
+0 0 24

Review Comment:
   I was just looking at the output (not the plan) -- I see now that the `ORDER BY` is on `a` but the value is `c` 
   
   Since the query groups by `a, b` each group that `FIRST_VALUE` is evaluated on, will have the same value of `a` and thus `FIRST_VALUE` is effectively arbitrary as `ORDER BY a` doesn't define a total ordering
   
   When I printed out the values in `annotated_data_infinite2` it is clearer to me that the output of this query is "undefined" in the sense that any of the values of `c` are acceptable (I wonder if this test will therefore be unstable 🤔 ) . Maybe we can somehow make the query more representative for the future
   
   ```
   query III
   select a, b, c from annotated_data_infinite2 order by a, b, c;
   ----
   0 0 0
   0 0 1
   0 0 2
   0 0 3
   0 0 4
   0 0 5
   0 0 6
   0 0 7
   0 0 8
   0 0 9
   0 0 10
   0 0 11
   0 0 12
   0 0 13
   0 0 14
   0 0 15
   0 0 16
   0 0 17
   0 0 18
   0 0 19
   0 0 20
   0 0 21
   0 0 22
   0 0 23
   0 0 24
   0 1 25
   0 1 26
   0 1 27
   0 1 28
   0 1 29
   0 1 30
   0 1 31
   0 1 32
   0 1 33
   0 1 34
   0 1 35
   0 1 36
   0 1 37
   0 1 38
   0 1 39
   0 1 40
   0 1 41
   0 1 42
   0 1 43
   0 1 44
   0 1 45
   0 1 46
   0 1 47
   0 1 48
   0 1 49
   1 2 50
   1 2 51
   1 2 52
   1 2 53
   1 2 54
   1 2 55
   1 2 56
   1 2 57
   1 2 58
   1 2 59
   1 2 60
   1 2 61
   1 2 62
   1 2 63
   1 2 64
   1 2 65
   1 2 66
   1 2 67
   1 2 68
   1 2 69
   1 2 70
   1 2 71
   1 2 72
   1 2 73
   1 2 74
   1 3 75
   1 3 76
   1 3 77
   1 3 78
   1 3 79
   1 3 80
   1 3 81
   1 3 82
   1 3 83
   1 3 84
   1 3 85
   1 3 86
   1 3 87
   1 3 88
   1 3 89
   1 3 90
   1 3 91
   1 3 92
   1 3 93
   1 3 94
   1 3 95
   1 3 96
   1 3 97
   1 3 98
   1 3 99
   ``` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6734: Add support for order-sensitive aggregation for multipartitions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6734:
URL: https://github.com/apache/arrow-datafusion/pull/6734#discussion_r1238583660


##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2076,18 +2076,18 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, FIRST_VALUE(
 ----TableScan: annotated_data_infinite2 projection=[a, b, c]
 physical_plan
 ProjectionExec: expr=[a@0 as a, b@1 as b, FIRST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as first_c]
---AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
+--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
 ----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
 
 query III
 SELECT a, b, FIRST_VALUE(c ORDER BY a DESC) as first_c
   FROM annotated_data_infinite2
   GROUP BY a, b
 ----
-0 0 0
-0 1 25
-1 2 50
-1 3 75
+0 0 24

Review Comment:
   I think this comment needs to be addressed prior to merge (the change may be ok, but I am not sure so it deserves a second look)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6734: Add support for order-sensitive aggregation for multipartitions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6734:
URL: https://github.com/apache/arrow-datafusion/pull/6734#discussion_r1238698423


##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2076,18 +2076,18 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, FIRST_VALUE(
 ----TableScan: annotated_data_infinite2 projection=[a, b, c]
 physical_plan
 ProjectionExec: expr=[a@0 as a, b@1 as b, FIRST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as first_c]
---AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
+--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
 ----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
 
 query III
 SELECT a, b, FIRST_VALUE(c ORDER BY a DESC) as first_c
   FROM annotated_data_infinite2
   GROUP BY a, b
 ----
-0 0 0
-0 1 25
-1 2 50
-1 3 75
+0 0 24

Review Comment:
   I was just looking at the output (not the plan) -- I see now that the `ORDER BY` is on `a` but the value is `c` 
   
   Since the query groups by `a, b` each group that `FIRST_VALUE` is evaluated on, will have the same order of c and thus FIRST_VALUE is effectively arbitrary.
   
   When I printed out the values in `annotated_data_infinite2` it is clearer to me that the output of this query is "undefined" in the sense that any of the values of `c` are acceptable (I wonder if this test will therefore be unstable 🤔 ) . Maybe we can somehow make the query more representative for the future
   
   ```
   query III
   select a, b, c from annotated_data_infinite2 order by a, b, c;
   ----
   0 0 0
   0 0 1
   0 0 2
   0 0 3
   0 0 4
   0 0 5
   0 0 6
   0 0 7
   0 0 8
   0 0 9
   0 0 10
   0 0 11
   0 0 12
   0 0 13
   0 0 14
   0 0 15
   0 0 16
   0 0 17
   0 0 18
   0 0 19
   0 0 20
   0 0 21
   0 0 22
   0 0 23
   0 0 24
   0 1 25
   0 1 26
   0 1 27
   0 1 28
   0 1 29
   0 1 30
   0 1 31
   0 1 32
   0 1 33
   0 1 34
   0 1 35
   0 1 36
   0 1 37
   0 1 38
   0 1 39
   0 1 40
   0 1 41
   0 1 42
   0 1 43
   0 1 44
   0 1 45
   0 1 46
   0 1 47
   0 1 48
   0 1 49
   1 2 50
   1 2 51
   1 2 52
   1 2 53
   1 2 54
   1 2 55
   1 2 56
   1 2 57
   1 2 58
   1 2 59
   1 2 60
   1 2 61
   1 2 62
   1 2 63
   1 2 64
   1 2 65
   1 2 66
   1 2 67
   1 2 68
   1 2 69
   1 2 70
   1 2 71
   1 2 72
   1 2 73
   1 2 74
   1 3 75
   1 3 76
   1 3 77
   1 3 78
   1 3 79
   1 3 80
   1 3 81
   1 3 82
   1 3 83
   1 3 84
   1 3 85
   1 3 86
   1 3 87
   1 3 88
   1 3 89
   1 3 90
   1 3 91
   1 3 92
   1 3 93
   1 3 94
   1 3 95
   1 3 96
   1 3 97
   1 3 98
   1 3 99
   ``` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6734: Add support for order-sensitive aggregation for multipartitions

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6734:
URL: https://github.com/apache/arrow-datafusion/pull/6734#discussion_r1238676008


##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2580,12 +2580,280 @@ FRA 200 50 250
 
 # Run order-sensitive aggregators in multiple partitions
 statement ok
-set datafusion.execution.target_partitions = 2;
+set datafusion.execution.target_partitions = 8;
+
+# order-sensitive FIRST_VALUE and LAST_VALUE aggregators should work in
+# multi-partitions also.
+query TT
+EXPLAIN SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
+  LAST_VALUE(amount ORDER BY ts ASC) AS fv2
+  FROM sales_global
+  GROUP BY country
+  ORDER BY country
+----
+logical_plan
+Sort: sales_global.country ASC NULLS LAST
+--Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST] AS fv2
+----Aggregate: groupBy=[[sales_global.country]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]]]
+------TableScan: sales_global projection=[country, ts, amount]
+physical_plan
+SortPreservingMergeExec: [country@0 ASC NULLS LAST]
+--SortExec: expr=[country@0 ASC NULLS LAST]
+----ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@2 as fv2]
+------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+--------SortExec: expr=[ts@1 ASC NULLS LAST]

Review Comment:
   Yes, exactly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #6734: Add support for order-sensitive aggregation for multipartitions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb merged PR #6734:
URL: https://github.com/apache/arrow-datafusion/pull/6734


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6734: Add support for order-sensitive aggregation for multipartitions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6734:
URL: https://github.com/apache/arrow-datafusion/pull/6734#discussion_r1238698423


##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2076,18 +2076,18 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, FIRST_VALUE(
 ----TableScan: annotated_data_infinite2 projection=[a, b, c]
 physical_plan
 ProjectionExec: expr=[a@0 as a, b@1 as b, FIRST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as first_c]
---AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
+--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
 ----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
 
 query III
 SELECT a, b, FIRST_VALUE(c ORDER BY a DESC) as first_c
   FROM annotated_data_infinite2
   GROUP BY a, b
 ----
-0 0 0
-0 1 25
-1 2 50
-1 3 75
+0 0 24

Review Comment:
   I was just looking at the output (not the plan) -- I see now that the `ORDER BY` is on `a` but the value is `c` 
   
   Since the query groups by `a, b` each group that `FIRST_VALUE` is evaluated on, will have the same value of `c` and thus `FIRST_VALUE` is effectively arbitrary.
   
   When I printed out the values in `annotated_data_infinite2` it is clearer to me that the output of this query is "undefined" in the sense that any of the values of `c` are acceptable (I wonder if this test will therefore be unstable 🤔 ) . Maybe we can somehow make the query more representative for the future
   
   ```
   query III
   select a, b, c from annotated_data_infinite2 order by a, b, c;
   ----
   0 0 0
   0 0 1
   0 0 2
   0 0 3
   0 0 4
   0 0 5
   0 0 6
   0 0 7
   0 0 8
   0 0 9
   0 0 10
   0 0 11
   0 0 12
   0 0 13
   0 0 14
   0 0 15
   0 0 16
   0 0 17
   0 0 18
   0 0 19
   0 0 20
   0 0 21
   0 0 22
   0 0 23
   0 0 24
   0 1 25
   0 1 26
   0 1 27
   0 1 28
   0 1 29
   0 1 30
   0 1 31
   0 1 32
   0 1 33
   0 1 34
   0 1 35
   0 1 36
   0 1 37
   0 1 38
   0 1 39
   0 1 40
   0 1 41
   0 1 42
   0 1 43
   0 1 44
   0 1 45
   0 1 46
   0 1 47
   0 1 48
   0 1 49
   1 2 50
   1 2 51
   1 2 52
   1 2 53
   1 2 54
   1 2 55
   1 2 56
   1 2 57
   1 2 58
   1 2 59
   1 2 60
   1 2 61
   1 2 62
   1 2 63
   1 2 64
   1 2 65
   1 2 66
   1 2 67
   1 2 68
   1 2 69
   1 2 70
   1 2 71
   1 2 72
   1 2 73
   1 2 74
   1 3 75
   1 3 76
   1 3 77
   1 3 78
   1 3 79
   1 3 80
   1 3 81
   1 3 82
   1 3 83
   1 3 84
   1 3 85
   1 3 86
   1 3 87
   1 3 88
   1 3 89
   1 3 90
   1 3 91
   1 3 92
   1 3 93
   1 3 94
   1 3 95
   1 3 96
   1 3 97
   1 3 98
   1 3 99
   ``` 



##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2076,18 +2076,18 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, FIRST_VALUE(
 ----TableScan: annotated_data_infinite2 projection=[a, b, c]
 physical_plan
 ProjectionExec: expr=[a@0 as a, b@1 as b, FIRST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as first_c]
---AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
+--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
 ----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
 
 query III
 SELECT a, b, FIRST_VALUE(c ORDER BY a DESC) as first_c
   FROM annotated_data_infinite2
   GROUP BY a, b
 ----
-0 0 0
-0 1 25
-1 2 50
-1 3 75
+0 0 24

Review Comment:
   (this change now makes sense to me)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6734: Add support for order-sensitive aggregation for multipartitions

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6734:
URL: https://github.com/apache/arrow-datafusion/pull/6734#discussion_r1238686892


##########
datafusion/physical-expr/src/aggregate/array_agg_ordered.rs:
##########
@@ -0,0 +1,605 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
+use crate::expressions::format_state_name;
+use crate::{AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr};
+use arrow::array::ArrayRef;
+use arrow::datatypes::{DataType, Field};
+use arrow_array::{Array, ListArray};
+use arrow_schema::{Fields, SortOptions};
+use datafusion_common::utils::{compare_rows, get_row_at_idx};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+use itertools::izip;
+use std::any::Any;
+use std::cmp::Ordering;
+use std::collections::BinaryHeap;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// Expression for a ARRAY_AGG(ORDER BY) aggregation.
+#[derive(Debug)]
+pub struct OrderSensitiveArrayAgg {
+    name: String,
+    input_data_type: DataType,
+    order_by_data_types: Vec<DataType>,
+    expr: Arc<dyn PhysicalExpr>,
+    ordering_req: LexOrdering,
+}
+
+impl OrderSensitiveArrayAgg {
+    /// Create a new `OrderSensitiveArrayAgg` aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+        order_by_data_types: Vec<DataType>,
+        ordering_req: LexOrdering,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            input_data_type,
+            order_by_data_types,
+            ordering_req,
+        }
+    }
+}
+
+impl AggregateExpr for OrderSensitiveArrayAgg {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new_list(
+            &self.name,
+            Field::new("item", self.input_data_type.clone(), true),
+            false,
+        ))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(OrderSensitiveArrayAggAccumulator::try_new(
+            &self.input_data_type,
+            &self.order_by_data_types,
+            self.ordering_req.clone(),
+        )?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        let mut fields = vec![Field::new_list(
+            format_state_name(&self.name, "array_agg"),
+            Field::new("item", self.input_data_type.clone(), true),
+            false,
+        )];
+        let orderings = ordering_fields(&self.ordering_req, &self.order_by_data_types);
+        fields.push(Field::new_list(
+            format_state_name(&self.name, "array_agg_orderings"),
+            Field::new(
+                "item",
+                DataType::Struct(Fields::from(orderings.clone())),
+                true,
+            ),
+            false,
+        ));
+        fields.extend(orderings);
+        Ok(fields)
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
+        if self.ordering_req.is_empty() {
+            None
+        } else {
+            Some(&self.ordering_req)
+        }
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+impl PartialEq<dyn Any> for OrderSensitiveArrayAgg {
+    fn eq(&self, other: &dyn Any) -> bool {
+        down_cast_any_ref(other)
+            .downcast_ref::<Self>()
+            .map(|x| {
+                self.name == x.name
+                    && self.input_data_type == x.input_data_type
+                    && self.order_by_data_types == x.order_by_data_types
+                    && self.expr.eq(&x.expr)
+            })
+            .unwrap_or(false)
+    }
+}
+
+#[derive(Debug)]
+pub(crate) struct OrderSensitiveArrayAggAccumulator {
+    values: Vec<ScalarValue>,

Review Comment:
   Updated the documentation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6734: Add support for order-sensitive aggregation for multipartitions

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6734:
URL: https://github.com/apache/arrow-datafusion/pull/6734#discussion_r1238673844


##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2076,18 +2076,18 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, FIRST_VALUE(
 ----TableScan: annotated_data_infinite2 projection=[a, b, c]
 physical_plan
 ProjectionExec: expr=[a@0 as a, b@1 as b, FIRST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as first_c]
---AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
+--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
 ----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
 
 query III
 SELECT a, b, FIRST_VALUE(c ORDER BY a DESC) as first_c
   FROM annotated_data_infinite2
   GROUP BY a, b
 ----
-0 0 0
-0 1 25
-1 2 50
-1 3 75
+0 0 24

Review Comment:
   The reason is that source is ordered by `a ASC`. Since aggregation requires ordering by `a DESC` it swaps the aggregation to resolve requirement. However, since requirement column is among the group by column (`group by a, b`), we are sure that each group will involve unique `a` values. Hence in practice `a DESC` and `a ASC` is already satisfied, since each group will involve same `a` values. In short, previous plan and this plan are both valid. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6734: Add support for order-sensitive aggregation for multipartitions

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6734:
URL: https://github.com/apache/arrow-datafusion/pull/6734#discussion_r1238773537


##########
datafusion/physical-expr/src/aggregate/array_agg_ordered.rs:
##########
@@ -0,0 +1,605 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
+use crate::expressions::format_state_name;
+use crate::{AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr};
+use arrow::array::ArrayRef;
+use arrow::datatypes::{DataType, Field};
+use arrow_array::{Array, ListArray};
+use arrow_schema::{Fields, SortOptions};
+use datafusion_common::utils::{compare_rows, get_row_at_idx};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+use itertools::izip;
+use std::any::Any;
+use std::cmp::Ordering;
+use std::collections::BinaryHeap;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// Expression for a ARRAY_AGG(ORDER BY) aggregation.
+#[derive(Debug)]
+pub struct OrderSensitiveArrayAgg {
+    name: String,
+    input_data_type: DataType,
+    order_by_data_types: Vec<DataType>,
+    expr: Arc<dyn PhysicalExpr>,
+    ordering_req: LexOrdering,
+}
+
+impl OrderSensitiveArrayAgg {
+    /// Create a new `OrderSensitiveArrayAgg` aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+        order_by_data_types: Vec<DataType>,
+        ordering_req: LexOrdering,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            input_data_type,
+            order_by_data_types,
+            ordering_req,
+        }
+    }
+}
+
+impl AggregateExpr for OrderSensitiveArrayAgg {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new_list(
+            &self.name,
+            Field::new("item", self.input_data_type.clone(), true),
+            false,
+        ))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(OrderSensitiveArrayAggAccumulator::try_new(
+            &self.input_data_type,
+            &self.order_by_data_types,
+            self.ordering_req.clone(),
+        )?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        let mut fields = vec![Field::new_list(
+            format_state_name(&self.name, "array_agg"),
+            Field::new("item", self.input_data_type.clone(), true),
+            false,
+        )];
+        let orderings = ordering_fields(&self.ordering_req, &self.order_by_data_types);
+        fields.push(Field::new_list(
+            format_state_name(&self.name, "array_agg_orderings"),
+            Field::new(
+                "item",
+                DataType::Struct(Fields::from(orderings.clone())),
+                true,
+            ),
+            false,
+        ));
+        fields.extend(orderings);
+        Ok(fields)
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
+        if self.ordering_req.is_empty() {
+            None
+        } else {
+            Some(&self.ordering_req)
+        }
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+impl PartialEq<dyn Any> for OrderSensitiveArrayAgg {
+    fn eq(&self, other: &dyn Any) -> bool {
+        down_cast_any_ref(other)
+            .downcast_ref::<Self>()
+            .map(|x| {
+                self.name == x.name
+                    && self.input_data_type == x.input_data_type
+                    && self.order_by_data_types == x.order_by_data_types
+                    && self.expr.eq(&x.expr)
+            })
+            .unwrap_or(false)
+    }
+}
+
+#[derive(Debug)]
+pub(crate) struct OrderSensitiveArrayAggAccumulator {
+    values: Vec<ScalarValue>,
+    ordering_values: Vec<Vec<ScalarValue>>,
+    datatypes: Vec<DataType>,
+    ordering_req: LexOrdering,
+}
+
+impl OrderSensitiveArrayAggAccumulator {
+    /// Create a new order-sensitive ARRAY_AGG accumulator based on the given
+    /// item data type.
+    pub fn try_new(
+        datatype: &DataType,
+        ordering_dtypes: &[DataType],
+        ordering_req: LexOrdering,
+    ) -> Result<Self> {
+        let mut datatypes = vec![datatype.clone()];
+        datatypes.extend(ordering_dtypes.iter().cloned());
+        Ok(Self {
+            values: vec![],
+            ordering_values: vec![],
+            datatypes,
+            ordering_req,
+        })
+    }
+}
+
+impl Accumulator for OrderSensitiveArrayAggAccumulator {
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        if values.is_empty() {
+            return Ok(());
+        }
+        let n_row = values[0].len();
+        for index in 0..n_row {
+            let row = get_row_at_idx(values, index)?;
+            self.values.push(row[0].clone());
+            self.ordering_values.push(row[1..].to_vec());
+        }
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        if states.is_empty() {
+            return Ok(());
+        }
+        // First entry in the state is the aggregation result.
+        let array_agg_values = &states[0];
+        // 2nd entry stores values received for ordering requirement columns, for each aggregation value inside ARRAY_AGG list.
+        // For each `ScalarValue` inside ARRAY_AGG list, we will receive a `Vec<ScalarValue>` that stores
+        // values received from its ordering requirement expression. (This information is necessary for during merging).
+        let agg_orderings = &states[1];
+        if agg_orderings.as_any().is::<ListArray>() {
+            // Stores ARRAY_AGG results coming from each partition
+            let mut partition_values = vec![];
+            // Stores ordering requirement expression results coming from each partition
+            let mut partition_ordering_values = vec![];
+            for index in 0..agg_orderings.len() {
+                let ordering = ScalarValue::try_from_array(agg_orderings, index)?;
+                // Ordering requirement expression values for each entry in the ARRAY_AGG list
+                let other_ordering_values =
+                    self.convert_array_agg_to_orderings(ordering)?;
+                // ARRAY_AGG result. (It is a `ScalarValue::List` under the hood, it stores `Vec<ScalarValue>`)
+                let array_agg_res = ScalarValue::try_from_array(array_agg_values, index)?;
+                if let ScalarValue::List(Some(other_values), _) = array_agg_res {
+                    partition_values.push(other_values);
+                    partition_ordering_values.push(other_ordering_values);
+                } else {
+                    return Err(DataFusionError::Internal(
+                        "ARRAY_AGG state must be list!".into(),
+                    ));
+                }
+            }
+            let sort_options = self
+                .ordering_req
+                .iter()
+                .map(|sort_expr| sort_expr.options)
+                .collect::<Vec<_>>();
+            self.values = merge_ordered_arrays(
+                &partition_values,
+                &partition_ordering_values,
+                &sort_options,
+            )?;
+        } else {
+            return Err(DataFusionError::Execution(
+                "Expects to receive a list array".to_string(),
+            ));
+        }
+        Ok(())
+    }
+
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        let mut result = vec![self.evaluate()?];
+        result.push(self.evaluate_orderings()?);
+        let last_ordering = if let Some(ordering) = self.ordering_values.last() {
+            ordering.clone()
+        } else {
+            // In case ordering is empty, construct ordering as NULL:
+            self.datatypes
+                .iter()
+                .skip(1)
+                .map(ScalarValue::try_from)
+                .collect::<Result<Vec<_>>>()?
+        };
+        result.extend(last_ordering);
+        Ok(result)
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(ScalarValue::new_list(
+            Some(self.values.clone()),
+            self.datatypes[0].clone(),
+        ))
+    }
+
+    fn size(&self) -> usize {
+        let mut total = std::mem::size_of_val(self)
+            + ScalarValue::size_of_vec(&self.values)
+            - std::mem::size_of_val(&self.values);
+
+        // Add size of the `self.ordering_values`
+        total +=
+            std::mem::size_of::<Vec<ScalarValue>>() * self.ordering_values.capacity();
+        for row in &self.ordering_values {
+            total += ScalarValue::size_of_vec(row) - std::mem::size_of_val(row);
+        }
+
+        // Add size of the `self.datatypes`
+        total += std::mem::size_of::<DataType>() * self.datatypes.capacity();
+        for dtype in &self.datatypes {
+            total += dtype.size() - std::mem::size_of_val(dtype);
+        }
+
+        // Add size of the `self.ordering_req`
+        total += std::mem::size_of::<PhysicalSortExpr>() * self.ordering_req.capacity();
+        // TODO: Calculate size of each `PhysicalSortExpr` more accurately.
+        total
+    }
+}
+
+impl OrderSensitiveArrayAggAccumulator {
+    fn convert_array_agg_to_orderings(
+        &self,
+        in_data: ScalarValue,
+    ) -> Result<Vec<Vec<ScalarValue>>> {
+        if let ScalarValue::List(Some(list_vals), _field_ref) = in_data {
+            list_vals.into_iter().map(|struct_vals| {
+                if let ScalarValue::Struct(Some(orderings), _fields) = struct_vals {
+                    Ok(orderings)
+                } else {
+                    Err(DataFusionError::Execution(format!(
+                        "Expects to receive ScalarValue::Struct(Some(..), _) but got:{:?}",
+                        struct_vals.get_datatype()
+                    )))
+                }
+            }).collect::<Result<Vec<_>>>()
+        } else {
+            Err(DataFusionError::Execution(format!(
+                "Expects to receive ScalarValue::List(Some(..), _) but got:{:?}",
+                in_data.get_datatype()
+            )))
+        }
+    }
+
+    fn evaluate_orderings(&self) -> Result<ScalarValue> {
+        let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]);
+        let struct_field = Fields::from(fields.clone());
+        let orderings = self
+            .ordering_values
+            .iter()
+            .map(|ordering| {
+                ScalarValue::Struct(Some(ordering.clone()), struct_field.clone())
+            })
+            .collect();
+        let struct_type = DataType::Struct(Fields::from(fields));
+        Ok(ScalarValue::new_list(Some(orderings), struct_type))
+    }
+}
+
+/// This is a wrapper struct to be able to correctly merge ARRAY_AGG
+/// data from multiple partitions using `BinaryHeap`.
+/// When used inside `BinaryHeap` this struct returns smallest `CustomElement`,
+/// where smallest is determined by `ordering` values (`Vec<ScalarValue>`)
+/// according to `sort_options`
+#[derive(Debug, PartialEq, Eq)]
+struct CustomElement<'a> {
+    // Stores from which partition entry is received
+    branch_idx: usize,
+    // values to be merged
+    value: ScalarValue,
+    // according to `ordering` values, comparisons will be done.
+    ordering: Vec<ScalarValue>,
+    // `sort_options` defines, desired ordering by the user
+    sort_options: &'a [SortOptions],
+}
+
+impl<'a> CustomElement<'a> {
+    fn new(
+        branch_idx: usize,
+        value: ScalarValue,
+        ordering: Vec<ScalarValue>,
+        sort_options: &'a [SortOptions],
+    ) -> Self {
+        Self {
+            branch_idx,
+            value,
+            ordering,
+            sort_options,
+        }
+    }
+
+    fn ordering(
+        &self,
+        current: &[ScalarValue],
+        target: &[ScalarValue],
+    ) -> Result<Ordering> {
+        // Calculate ordering according to `sort_options`
+        compare_rows(current, target, self.sort_options)
+    }
+}
+
+// Overwrite ordering implementation such that
+// - `self.ordering` values are used for comparison,
+// - When used inside `BinaryHeap` it is a min-heap.
+impl<'a> Ord for CustomElement<'a> {
+    fn cmp(&self, other: &Self) -> Ordering {
+        // Compares according to custom ordering
+        self.ordering(&self.ordering, &other.ordering)
+            // Convert max heap to min heap
+            .map(|ordering| ordering.reverse())
+            // This function return error, when `self.ordering` and `other.ordering`
+            // have different types (such as one is `ScalarValue::Int64`, other is `ScalarValue::Float32`)
+            // Here this case won't happen, because data from each partition will have same type
+            .unwrap()
+    }
+}
+
+impl<'a> PartialOrd for CustomElement<'a> {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+/// This functions merges `values` array (`&[Vec<ScalarValue>]`) into single array `Vec<ScalarValue>`
+/// Merging done according to ordering values stored inside `ordering_values` (`&[Vec<Vec<ScalarValue>>]`)
+/// Inner `Vec<ScalarValue>` in the `ordering_values` can be thought as ordering information for the
+/// each `ScalarValue` in the `values` array.
+/// Desired ordering specified by `sort_options` argument (Should have same size with inner `Vec<ScalarValue>`
+/// of the `ordering_values` array).
+///
+/// As an example
+/// values can be \[
+///      \[1, 2, 3, 4, 5\],
+///      \[1, 2, 3, 4\],
+///      \[1, 2, 3, 4, 5, 6\],
+/// \]
+/// In this case we will be merging three arrays (doesn't have to be same size)
+/// and produce a merged array with size 15 (sum of 5+4+6)
+/// Merging will be done according to ordering at `ordering_values` vector.
+/// As an example `ordering_values` can be [
+///      \[(1, a), (2, b), (3, b), (4, a), (5, b) \],
+///      \[(1, a), (2, b), (3, b), (4, a) \],
+///      \[(1, b), (2, c), (3, d), (4, e), (5, a), (6, b) \],
+/// ]
+/// For each ScalarValue in the `values` we have a corresponding `Vec<ScalarValue>` (like timestamp of it)
+/// for the example above `sort_options` will have size two, that defines ordering requirement of the merge.
+/// Inner `Vec<ScalarValue>`s of the `ordering_values` will be compared according `sort_options` (Their sizes should match)
+fn merge_ordered_arrays(
+    // We will merge values into single `Vec<ScalarValue>`.
+    values: &[Vec<ScalarValue>],
+    // `values` will be merged according to `ordering_values`.
+    // Inner `Vec<ScalarValue>` can be thought as ordering information for the
+    // each `ScalarValue` in the values`.
+    ordering_values: &[Vec<Vec<ScalarValue>>],
+    // Defines according to which ordering comparisons should be done.
+    sort_options: &[SortOptions],
+) -> Result<Vec<ScalarValue>> {
+    // Keep track the most recent data of each branch, in binary heap data structure.
+    let mut heap: BinaryHeap<CustomElement> = BinaryHeap::new();
+
+    if !(values.len() == ordering_values.len()
+        && values
+            .iter()
+            .zip(ordering_values.iter())
+            .all(|(vals, ordering_vals)| vals.len() == ordering_vals.len()))
+    {
+        return Err(DataFusionError::Execution(
+            "Expects values arguments and/or ordering_values arguments to have same size"
+                .to_string(),
+        ));
+    }
+    let n_branch = values.len();
+    // For each branch we keep track of indices of next will be merged entry

Review Comment:
   It is quite similar in spirit, However, sort_preserving_merge works on streams, and internally it uses Winner tree. I tried, however, find it hard to incorporate it here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #6734: Add support for order-sensitive aggregation for multipartitions

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #6734:
URL: https://github.com/apache/arrow-datafusion/pull/6734#issuecomment-1604317287

   This is good to go from my perspective. If any issue that we might havem missed comes up, we will address in a follow on PR. Thanks for the review @alamb!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6734: Add support for order-sensitive aggregation for multipartitions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6734:
URL: https://github.com/apache/arrow-datafusion/pull/6734#discussion_r1238526016


##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -81,6 +81,8 @@ pub enum AggregateMode {
 /// Group By expression modes
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub enum GroupByOrderMode {
+    /// None of the expressions in the GROUP BY clause have an ordering.
+    Linear,

Review Comment:
   I wonder if `None` would be a clearer name here



##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2580,12 +2580,280 @@ FRA 200 50 250
 
 # Run order-sensitive aggregators in multiple partitions
 statement ok
-set datafusion.execution.target_partitions = 2;
+set datafusion.execution.target_partitions = 8;
+
+# order-sensitive FIRST_VALUE and LAST_VALUE aggregators should work in
+# multi-partitions also.
+query TT
+EXPLAIN SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
+  LAST_VALUE(amount ORDER BY ts ASC) AS fv2
+  FROM sales_global
+  GROUP BY country
+  ORDER BY country
+----
+logical_plan
+Sort: sales_global.country ASC NULLS LAST
+--Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST] AS fv2
+----Aggregate: groupBy=[[sales_global.country]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]]]
+------TableScan: sales_global projection=[country, ts, amount]
+physical_plan
+SortPreservingMergeExec: [country@0 ASC NULLS LAST]
+--SortExec: expr=[country@0 ASC NULLS LAST]
+----ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@2 as fv2]
+------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+--------SortExec: expr=[ts@1 ASC NULLS LAST]

Review Comment:
   This second (re)sort is needed because `RepartitionExec: partitioning=Hash` does not preserve the ordering right? 



##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2076,18 +2076,18 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, FIRST_VALUE(
 ----TableScan: annotated_data_infinite2 projection=[a, b, c]
 physical_plan
 ProjectionExec: expr=[a@0 as a, b@1 as b, FIRST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as first_c]
---AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
+--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
 ----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
 
 query III
 SELECT a, b, FIRST_VALUE(c ORDER BY a DESC) as first_c
   FROM annotated_data_infinite2
   GROUP BY a, b
 ----
-0 0 0
-0 1 25
-1 2 50
-1 3 75
+0 0 24

Review Comment:
   Why did this output change? I also don't understand why the plan changed from `FIRST_VALUE` to `LAST_VALUE` when the query did not change



##########
datafusion/physical-expr/src/aggregate/array_agg_ordered.rs:
##########
@@ -0,0 +1,605 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
+use crate::expressions::format_state_name;
+use crate::{AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr};
+use arrow::array::ArrayRef;
+use arrow::datatypes::{DataType, Field};
+use arrow_array::{Array, ListArray};
+use arrow_schema::{Fields, SortOptions};
+use datafusion_common::utils::{compare_rows, get_row_at_idx};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+use itertools::izip;
+use std::any::Any;
+use std::cmp::Ordering;
+use std::collections::BinaryHeap;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// Expression for a ARRAY_AGG(ORDER BY) aggregation.

Review Comment:
   It might also help to add some comments here that this special operator is required to handle multi-phase / multi-partion aggregation where the aggregate is computed using multiple intermediate aggregates that need to be combined together



##########
datafusion/physical-expr/src/aggregate/first_last.rs:
##########
@@ -117,42 +153,57 @@ impl PartialEq<dyn Any> for FirstValue {
 #[derive(Debug)]
 struct FirstValueAccumulator {
     first: ScalarValue,
-    // At the beginning, `is_set` is `false`, this means `first` is not seen yet.
-    // Once we see (`is_set=true`) first value, we do not update `first`.
+    // At the beginning, `is_set` is false, which means `first` is not seen yet.

Review Comment:
   An alternate formulation would be to store `first: Option<Scalar>` which would allow the compiler to ensure you checked for `first`'s initialization in all cases



##########
datafusion/physical-expr/src/aggregate/mod.rs:
##########
@@ -85,6 +86,13 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {
     /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
     fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
 
+    /// Order by requirements for the aggregate function
+    /// By default it is `None` (there is no requirement)
+    /// Order-sensitive aggregators should implement this

Review Comment:
   ```suggestion
       /// Order-sensitive aggregators, such as `FIRST_VALUE(x ORDER BY y)` should implement this
       /// so that DataFusion will ensure the input is sorted correctly
   ```



##########
datafusion/physical-expr/src/aggregate/array_agg_ordered.rs:
##########
@@ -0,0 +1,605 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
+use crate::expressions::format_state_name;
+use crate::{AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr};
+use arrow::array::ArrayRef;
+use arrow::datatypes::{DataType, Field};
+use arrow_array::{Array, ListArray};
+use arrow_schema::{Fields, SortOptions};
+use datafusion_common::utils::{compare_rows, get_row_at_idx};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+use itertools::izip;
+use std::any::Any;
+use std::cmp::Ordering;
+use std::collections::BinaryHeap;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// Expression for a ARRAY_AGG(ORDER BY) aggregation.
+#[derive(Debug)]
+pub struct OrderSensitiveArrayAgg {
+    name: String,
+    input_data_type: DataType,
+    order_by_data_types: Vec<DataType>,
+    expr: Arc<dyn PhysicalExpr>,
+    ordering_req: LexOrdering,
+}
+
+impl OrderSensitiveArrayAgg {
+    /// Create a new `OrderSensitiveArrayAgg` aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+        order_by_data_types: Vec<DataType>,
+        ordering_req: LexOrdering,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            input_data_type,
+            order_by_data_types,
+            ordering_req,
+        }
+    }
+}
+
+impl AggregateExpr for OrderSensitiveArrayAgg {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new_list(
+            &self.name,
+            Field::new("item", self.input_data_type.clone(), true),
+            false,
+        ))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(OrderSensitiveArrayAggAccumulator::try_new(
+            &self.input_data_type,
+            &self.order_by_data_types,
+            self.ordering_req.clone(),
+        )?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        let mut fields = vec![Field::new_list(
+            format_state_name(&self.name, "array_agg"),
+            Field::new("item", self.input_data_type.clone(), true),
+            false,
+        )];
+        let orderings = ordering_fields(&self.ordering_req, &self.order_by_data_types);
+        fields.push(Field::new_list(
+            format_state_name(&self.name, "array_agg_orderings"),
+            Field::new(
+                "item",
+                DataType::Struct(Fields::from(orderings.clone())),
+                true,
+            ),
+            false,
+        ));
+        fields.extend(orderings);
+        Ok(fields)
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
+        if self.ordering_req.is_empty() {
+            None
+        } else {
+            Some(&self.ordering_req)
+        }
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+impl PartialEq<dyn Any> for OrderSensitiveArrayAgg {
+    fn eq(&self, other: &dyn Any) -> bool {
+        down_cast_any_ref(other)
+            .downcast_ref::<Self>()
+            .map(|x| {
+                self.name == x.name
+                    && self.input_data_type == x.input_data_type
+                    && self.order_by_data_types == x.order_by_data_types
+                    && self.expr.eq(&x.expr)
+            })
+            .unwrap_or(false)
+    }
+}
+
+#[derive(Debug)]
+pub(crate) struct OrderSensitiveArrayAggAccumulator {
+    values: Vec<ScalarValue>,
+    ordering_values: Vec<Vec<ScalarValue>>,
+    datatypes: Vec<DataType>,
+    ordering_req: LexOrdering,
+}
+
+impl OrderSensitiveArrayAggAccumulator {
+    /// Create a new order-sensitive ARRAY_AGG accumulator based on the given
+    /// item data type.
+    pub fn try_new(
+        datatype: &DataType,
+        ordering_dtypes: &[DataType],
+        ordering_req: LexOrdering,
+    ) -> Result<Self> {
+        let mut datatypes = vec![datatype.clone()];
+        datatypes.extend(ordering_dtypes.iter().cloned());
+        Ok(Self {
+            values: vec![],
+            ordering_values: vec![],
+            datatypes,
+            ordering_req,
+        })
+    }
+}
+
+impl Accumulator for OrderSensitiveArrayAggAccumulator {
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        if values.is_empty() {
+            return Ok(());
+        }
+        let n_row = values[0].len();
+        for index in 0..n_row {
+            let row = get_row_at_idx(values, index)?;
+            self.values.push(row[0].clone());
+            self.ordering_values.push(row[1..].to_vec());
+        }
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        if states.is_empty() {
+            return Ok(());
+        }
+        // First entry in the state is the aggregation result.
+        let array_agg_values = &states[0];
+        // 2nd entry stores values received for ordering requirement columns, for each aggregation value inside ARRAY_AGG list.
+        // For each `ScalarValue` inside ARRAY_AGG list, we will receive a `Vec<ScalarValue>` that stores
+        // values received from its ordering requirement expression. (This information is necessary for during merging).
+        let agg_orderings = &states[1];
+        if agg_orderings.as_any().is::<ListArray>() {
+            // Stores ARRAY_AGG results coming from each partition
+            let mut partition_values = vec![];
+            // Stores ordering requirement expression results coming from each partition
+            let mut partition_ordering_values = vec![];
+            for index in 0..agg_orderings.len() {
+                let ordering = ScalarValue::try_from_array(agg_orderings, index)?;
+                // Ordering requirement expression values for each entry in the ARRAY_AGG list
+                let other_ordering_values =
+                    self.convert_array_agg_to_orderings(ordering)?;
+                // ARRAY_AGG result. (It is a `ScalarValue::List` under the hood, it stores `Vec<ScalarValue>`)
+                let array_agg_res = ScalarValue::try_from_array(array_agg_values, index)?;
+                if let ScalarValue::List(Some(other_values), _) = array_agg_res {
+                    partition_values.push(other_values);
+                    partition_ordering_values.push(other_ordering_values);
+                } else {
+                    return Err(DataFusionError::Internal(
+                        "ARRAY_AGG state must be list!".into(),
+                    ));
+                }
+            }
+            let sort_options = self
+                .ordering_req
+                .iter()
+                .map(|sort_expr| sort_expr.options)
+                .collect::<Vec<_>>();
+            self.values = merge_ordered_arrays(
+                &partition_values,
+                &partition_ordering_values,
+                &sort_options,
+            )?;
+        } else {
+            return Err(DataFusionError::Execution(
+                "Expects to receive a list array".to_string(),
+            ));
+        }
+        Ok(())
+    }
+
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        let mut result = vec![self.evaluate()?];
+        result.push(self.evaluate_orderings()?);
+        let last_ordering = if let Some(ordering) = self.ordering_values.last() {
+            ordering.clone()
+        } else {
+            // In case ordering is empty, construct ordering as NULL:
+            self.datatypes
+                .iter()
+                .skip(1)
+                .map(ScalarValue::try_from)
+                .collect::<Result<Vec<_>>>()?
+        };
+        result.extend(last_ordering);
+        Ok(result)
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(ScalarValue::new_list(
+            Some(self.values.clone()),
+            self.datatypes[0].clone(),
+        ))
+    }
+
+    fn size(&self) -> usize {
+        let mut total = std::mem::size_of_val(self)
+            + ScalarValue::size_of_vec(&self.values)
+            - std::mem::size_of_val(&self.values);
+
+        // Add size of the `self.ordering_values`
+        total +=
+            std::mem::size_of::<Vec<ScalarValue>>() * self.ordering_values.capacity();
+        for row in &self.ordering_values {
+            total += ScalarValue::size_of_vec(row) - std::mem::size_of_val(row);
+        }
+
+        // Add size of the `self.datatypes`
+        total += std::mem::size_of::<DataType>() * self.datatypes.capacity();
+        for dtype in &self.datatypes {
+            total += dtype.size() - std::mem::size_of_val(dtype);
+        }
+
+        // Add size of the `self.ordering_req`
+        total += std::mem::size_of::<PhysicalSortExpr>() * self.ordering_req.capacity();
+        // TODO: Calculate size of each `PhysicalSortExpr` more accurately.

Review Comment:
   I think it is ok to leave this as a TODO and  be somewhat inaccurate here because they are all Arc's and therefore there will only be one actual `PhysicalSortExpr` rather than one per group, so their contribution to the overall memory use should be small. 



##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2580,12 +2580,280 @@ FRA 200 50 250
 
 # Run order-sensitive aggregators in multiple partitions
 statement ok
-set datafusion.execution.target_partitions = 2;
+set datafusion.execution.target_partitions = 8;
+
+# order-sensitive FIRST_VALUE and LAST_VALUE aggregators should work in
+# multi-partitions also.
+query TT
+EXPLAIN SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
+  LAST_VALUE(amount ORDER BY ts ASC) AS fv2
+  FROM sales_global
+  GROUP BY country
+  ORDER BY country
+----
+logical_plan
+Sort: sales_global.country ASC NULLS LAST
+--Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST] AS fv2
+----Aggregate: groupBy=[[sales_global.country]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]]]
+------TableScan: sales_global projection=[country, ts, amount]
+physical_plan
+SortPreservingMergeExec: [country@0 ASC NULLS LAST]
+--SortExec: expr=[country@0 ASC NULLS LAST]
+----ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@2 as fv2]
+------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+--------SortExec: expr=[ts@1 ASC NULLS LAST]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([Column { name: "country", index: 0 }], 8), input_partitions=8
+--------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)], ordering_mode=Linear
+----------------SortExec: expr=[ts@1 ASC NULLS LAST]
+------------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+--------------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query TRR
+SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
+  LAST_VALUE(amount ORDER BY ts ASC) AS fv2
+  FROM sales_global
+  GROUP BY country
+  ORDER BY country
+----
+FRA 50 200
+GRC 30 80
+TUR 75 100
+
+# Conversion in between FIRST_VALUE and LAST_VALUE to resolve
+# contradictory requirements should work in multi partitions.
+query TT
+EXPLAIN SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
+  LAST_VALUE(amount ORDER BY ts DESC) AS fv2
+  FROM sales_global
+  GROUP BY country
+  ORDER BY country
+----
+logical_plan
+Sort: sales_global.country ASC NULLS LAST
+--Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS fv2
+----Aggregate: groupBy=[[sales_global.country]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]]]
+------TableScan: sales_global projection=[country, ts, amount]
+physical_plan
+SortPreservingMergeExec: [country@0 ASC NULLS LAST]
+--SortExec: expr=[country@0 ASC NULLS LAST]
+----ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as fv2]
+------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)]
+--------SortExec: expr=[ts@1 ASC NULLS LAST]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([Column { name: "country", index: 0 }], 8), input_partitions=8
+--------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)], ordering_mode=Linear
+----------------SortExec: expr=[ts@1 ASC NULLS LAST]
+------------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+--------------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query TRR
+SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
+  LAST_VALUE(amount ORDER BY ts DESC) AS fv2
+  FROM sales_global
+  GROUP BY country
+  ORDER BY country
+----
+FRA 50 50
+GRC 30 30
+TUR 75 75
+
+# order-sensitive FIRST_VALUE and LAST_VALUE aggregators should work in
+# multi-partitions also.

Review Comment:
   ```suggestion
   # multi-partitions without group by also.
   ```



##########
datafusion/physical-expr/src/aggregate/array_agg_ordered.rs:
##########
@@ -0,0 +1,605 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
+use crate::expressions::format_state_name;
+use crate::{AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr};
+use arrow::array::ArrayRef;
+use arrow::datatypes::{DataType, Field};
+use arrow_array::{Array, ListArray};
+use arrow_schema::{Fields, SortOptions};
+use datafusion_common::utils::{compare_rows, get_row_at_idx};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+use itertools::izip;
+use std::any::Any;
+use std::cmp::Ordering;
+use std::collections::BinaryHeap;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// Expression for a ARRAY_AGG(ORDER BY) aggregation.
+#[derive(Debug)]
+pub struct OrderSensitiveArrayAgg {
+    name: String,
+    input_data_type: DataType,
+    order_by_data_types: Vec<DataType>,
+    expr: Arc<dyn PhysicalExpr>,
+    ordering_req: LexOrdering,
+}
+
+impl OrderSensitiveArrayAgg {
+    /// Create a new `OrderSensitiveArrayAgg` aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+        order_by_data_types: Vec<DataType>,
+        ordering_req: LexOrdering,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            input_data_type,
+            order_by_data_types,
+            ordering_req,
+        }
+    }
+}
+
+impl AggregateExpr for OrderSensitiveArrayAgg {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new_list(
+            &self.name,
+            Field::new("item", self.input_data_type.clone(), true),
+            false,
+        ))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(OrderSensitiveArrayAggAccumulator::try_new(
+            &self.input_data_type,
+            &self.order_by_data_types,
+            self.ordering_req.clone(),
+        )?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        let mut fields = vec![Field::new_list(
+            format_state_name(&self.name, "array_agg"),
+            Field::new("item", self.input_data_type.clone(), true),
+            false,
+        )];
+        let orderings = ordering_fields(&self.ordering_req, &self.order_by_data_types);
+        fields.push(Field::new_list(
+            format_state_name(&self.name, "array_agg_orderings"),
+            Field::new(
+                "item",
+                DataType::Struct(Fields::from(orderings.clone())),
+                true,
+            ),
+            false,
+        ));
+        fields.extend(orderings);
+        Ok(fields)
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
+        if self.ordering_req.is_empty() {
+            None
+        } else {
+            Some(&self.ordering_req)
+        }
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+impl PartialEq<dyn Any> for OrderSensitiveArrayAgg {
+    fn eq(&self, other: &dyn Any) -> bool {
+        down_cast_any_ref(other)
+            .downcast_ref::<Self>()
+            .map(|x| {
+                self.name == x.name
+                    && self.input_data_type == x.input_data_type
+                    && self.order_by_data_types == x.order_by_data_types
+                    && self.expr.eq(&x.expr)
+            })
+            .unwrap_or(false)
+    }
+}
+
+#[derive(Debug)]
+pub(crate) struct OrderSensitiveArrayAggAccumulator {
+    values: Vec<ScalarValue>,

Review Comment:
   I think it would help to document what values and ordering values are storing here, perhaps referring to the very nice documentation on `merge_ordered_arrays`  would be sufficient.



##########
datafusion/physical-expr/src/aggregate/array_agg_ordered.rs:
##########
@@ -0,0 +1,605 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution

Review Comment:
   This comment appears to be incorrect as this file contains an implemenation of ARRAY_AGG that works with ordering. 



##########
datafusion/physical-expr/src/aggregate/array_agg_ordered.rs:
##########
@@ -0,0 +1,605 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
+use crate::expressions::format_state_name;
+use crate::{AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr};
+use arrow::array::ArrayRef;
+use arrow::datatypes::{DataType, Field};
+use arrow_array::{Array, ListArray};
+use arrow_schema::{Fields, SortOptions};
+use datafusion_common::utils::{compare_rows, get_row_at_idx};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+use itertools::izip;
+use std::any::Any;
+use std::cmp::Ordering;
+use std::collections::BinaryHeap;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// Expression for a ARRAY_AGG(ORDER BY) aggregation.
+#[derive(Debug)]
+pub struct OrderSensitiveArrayAgg {
+    name: String,
+    input_data_type: DataType,
+    order_by_data_types: Vec<DataType>,
+    expr: Arc<dyn PhysicalExpr>,
+    ordering_req: LexOrdering,
+}
+
+impl OrderSensitiveArrayAgg {
+    /// Create a new `OrderSensitiveArrayAgg` aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+        order_by_data_types: Vec<DataType>,
+        ordering_req: LexOrdering,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            input_data_type,
+            order_by_data_types,
+            ordering_req,
+        }
+    }
+}
+
+impl AggregateExpr for OrderSensitiveArrayAgg {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new_list(
+            &self.name,
+            Field::new("item", self.input_data_type.clone(), true),
+            false,
+        ))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(OrderSensitiveArrayAggAccumulator::try_new(
+            &self.input_data_type,
+            &self.order_by_data_types,
+            self.ordering_req.clone(),
+        )?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        let mut fields = vec![Field::new_list(
+            format_state_name(&self.name, "array_agg"),
+            Field::new("item", self.input_data_type.clone(), true),
+            false,
+        )];
+        let orderings = ordering_fields(&self.ordering_req, &self.order_by_data_types);
+        fields.push(Field::new_list(
+            format_state_name(&self.name, "array_agg_orderings"),
+            Field::new(
+                "item",
+                DataType::Struct(Fields::from(orderings.clone())),
+                true,
+            ),
+            false,
+        ));
+        fields.extend(orderings);
+        Ok(fields)
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
+        if self.ordering_req.is_empty() {
+            None
+        } else {
+            Some(&self.ordering_req)
+        }
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+impl PartialEq<dyn Any> for OrderSensitiveArrayAgg {
+    fn eq(&self, other: &dyn Any) -> bool {
+        down_cast_any_ref(other)
+            .downcast_ref::<Self>()
+            .map(|x| {
+                self.name == x.name
+                    && self.input_data_type == x.input_data_type
+                    && self.order_by_data_types == x.order_by_data_types
+                    && self.expr.eq(&x.expr)
+            })
+            .unwrap_or(false)
+    }
+}
+
+#[derive(Debug)]
+pub(crate) struct OrderSensitiveArrayAggAccumulator {
+    values: Vec<ScalarValue>,
+    ordering_values: Vec<Vec<ScalarValue>>,
+    datatypes: Vec<DataType>,
+    ordering_req: LexOrdering,
+}
+
+impl OrderSensitiveArrayAggAccumulator {
+    /// Create a new order-sensitive ARRAY_AGG accumulator based on the given
+    /// item data type.
+    pub fn try_new(
+        datatype: &DataType,
+        ordering_dtypes: &[DataType],
+        ordering_req: LexOrdering,
+    ) -> Result<Self> {
+        let mut datatypes = vec![datatype.clone()];
+        datatypes.extend(ordering_dtypes.iter().cloned());
+        Ok(Self {
+            values: vec![],
+            ordering_values: vec![],
+            datatypes,
+            ordering_req,
+        })
+    }
+}
+
+impl Accumulator for OrderSensitiveArrayAggAccumulator {
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        if values.is_empty() {
+            return Ok(());
+        }
+        let n_row = values[0].len();
+        for index in 0..n_row {
+            let row = get_row_at_idx(values, index)?;
+            self.values.push(row[0].clone());
+            self.ordering_values.push(row[1..].to_vec());
+        }
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        if states.is_empty() {
+            return Ok(());
+        }
+        // First entry in the state is the aggregation result.
+        let array_agg_values = &states[0];
+        // 2nd entry stores values received for ordering requirement columns, for each aggregation value inside ARRAY_AGG list.
+        // For each `ScalarValue` inside ARRAY_AGG list, we will receive a `Vec<ScalarValue>` that stores
+        // values received from its ordering requirement expression. (This information is necessary for during merging).
+        let agg_orderings = &states[1];
+        if agg_orderings.as_any().is::<ListArray>() {
+            // Stores ARRAY_AGG results coming from each partition
+            let mut partition_values = vec![];
+            // Stores ordering requirement expression results coming from each partition
+            let mut partition_ordering_values = vec![];
+            for index in 0..agg_orderings.len() {
+                let ordering = ScalarValue::try_from_array(agg_orderings, index)?;
+                // Ordering requirement expression values for each entry in the ARRAY_AGG list
+                let other_ordering_values =
+                    self.convert_array_agg_to_orderings(ordering)?;
+                // ARRAY_AGG result. (It is a `ScalarValue::List` under the hood, it stores `Vec<ScalarValue>`)
+                let array_agg_res = ScalarValue::try_from_array(array_agg_values, index)?;
+                if let ScalarValue::List(Some(other_values), _) = array_agg_res {
+                    partition_values.push(other_values);
+                    partition_ordering_values.push(other_ordering_values);
+                } else {
+                    return Err(DataFusionError::Internal(
+                        "ARRAY_AGG state must be list!".into(),
+                    ));
+                }
+            }
+            let sort_options = self
+                .ordering_req
+                .iter()
+                .map(|sort_expr| sort_expr.options)
+                .collect::<Vec<_>>();
+            self.values = merge_ordered_arrays(
+                &partition_values,
+                &partition_ordering_values,
+                &sort_options,
+            )?;
+        } else {
+            return Err(DataFusionError::Execution(
+                "Expects to receive a list array".to_string(),
+            ));
+        }
+        Ok(())
+    }
+
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        let mut result = vec![self.evaluate()?];
+        result.push(self.evaluate_orderings()?);
+        let last_ordering = if let Some(ordering) = self.ordering_values.last() {
+            ordering.clone()
+        } else {
+            // In case ordering is empty, construct ordering as NULL:
+            self.datatypes
+                .iter()
+                .skip(1)
+                .map(ScalarValue::try_from)
+                .collect::<Result<Vec<_>>>()?
+        };
+        result.extend(last_ordering);
+        Ok(result)
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(ScalarValue::new_list(
+            Some(self.values.clone()),
+            self.datatypes[0].clone(),
+        ))
+    }
+
+    fn size(&self) -> usize {
+        let mut total = std::mem::size_of_val(self)
+            + ScalarValue::size_of_vec(&self.values)
+            - std::mem::size_of_val(&self.values);
+
+        // Add size of the `self.ordering_values`
+        total +=
+            std::mem::size_of::<Vec<ScalarValue>>() * self.ordering_values.capacity();
+        for row in &self.ordering_values {
+            total += ScalarValue::size_of_vec(row) - std::mem::size_of_val(row);
+        }
+
+        // Add size of the `self.datatypes`
+        total += std::mem::size_of::<DataType>() * self.datatypes.capacity();
+        for dtype in &self.datatypes {
+            total += dtype.size() - std::mem::size_of_val(dtype);
+        }
+
+        // Add size of the `self.ordering_req`
+        total += std::mem::size_of::<PhysicalSortExpr>() * self.ordering_req.capacity();
+        // TODO: Calculate size of each `PhysicalSortExpr` more accurately.
+        total
+    }
+}
+
+impl OrderSensitiveArrayAggAccumulator {
+    fn convert_array_agg_to_orderings(
+        &self,
+        in_data: ScalarValue,
+    ) -> Result<Vec<Vec<ScalarValue>>> {
+        if let ScalarValue::List(Some(list_vals), _field_ref) = in_data {
+            list_vals.into_iter().map(|struct_vals| {
+                if let ScalarValue::Struct(Some(orderings), _fields) = struct_vals {
+                    Ok(orderings)
+                } else {
+                    Err(DataFusionError::Execution(format!(
+                        "Expects to receive ScalarValue::Struct(Some(..), _) but got:{:?}",
+                        struct_vals.get_datatype()
+                    )))
+                }
+            }).collect::<Result<Vec<_>>>()
+        } else {
+            Err(DataFusionError::Execution(format!(
+                "Expects to receive ScalarValue::List(Some(..), _) but got:{:?}",
+                in_data.get_datatype()
+            )))
+        }
+    }
+
+    fn evaluate_orderings(&self) -> Result<ScalarValue> {
+        let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]);
+        let struct_field = Fields::from(fields.clone());
+        let orderings = self
+            .ordering_values
+            .iter()
+            .map(|ordering| {
+                ScalarValue::Struct(Some(ordering.clone()), struct_field.clone())
+            })
+            .collect();
+        let struct_type = DataType::Struct(Fields::from(fields));
+        Ok(ScalarValue::new_list(Some(orderings), struct_type))
+    }
+}
+
+/// This is a wrapper struct to be able to correctly merge ARRAY_AGG
+/// data from multiple partitions using `BinaryHeap`.
+/// When used inside `BinaryHeap` this struct returns smallest `CustomElement`,
+/// where smallest is determined by `ordering` values (`Vec<ScalarValue>`)
+/// according to `sort_options`
+#[derive(Debug, PartialEq, Eq)]
+struct CustomElement<'a> {
+    // Stores from which partition entry is received
+    branch_idx: usize,
+    // values to be merged
+    value: ScalarValue,
+    // according to `ordering` values, comparisons will be done.
+    ordering: Vec<ScalarValue>,
+    // `sort_options` defines, desired ordering by the user
+    sort_options: &'a [SortOptions],
+}
+
+impl<'a> CustomElement<'a> {
+    fn new(
+        branch_idx: usize,
+        value: ScalarValue,
+        ordering: Vec<ScalarValue>,
+        sort_options: &'a [SortOptions],
+    ) -> Self {
+        Self {
+            branch_idx,
+            value,
+            ordering,
+            sort_options,
+        }
+    }
+
+    fn ordering(
+        &self,
+        current: &[ScalarValue],
+        target: &[ScalarValue],
+    ) -> Result<Ordering> {
+        // Calculate ordering according to `sort_options`
+        compare_rows(current, target, self.sort_options)
+    }
+}
+
+// Overwrite ordering implementation such that
+// - `self.ordering` values are used for comparison,
+// - When used inside `BinaryHeap` it is a min-heap.
+impl<'a> Ord for CustomElement<'a> {
+    fn cmp(&self, other: &Self) -> Ordering {
+        // Compares according to custom ordering
+        self.ordering(&self.ordering, &other.ordering)
+            // Convert max heap to min heap
+            .map(|ordering| ordering.reverse())
+            // This function return error, when `self.ordering` and `other.ordering`
+            // have different types (such as one is `ScalarValue::Int64`, other is `ScalarValue::Float32`)
+            // Here this case won't happen, because data from each partition will have same type
+            .unwrap()
+    }
+}
+
+impl<'a> PartialOrd for CustomElement<'a> {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+/// This functions merges `values` array (`&[Vec<ScalarValue>]`) into single array `Vec<ScalarValue>`
+/// Merging done according to ordering values stored inside `ordering_values` (`&[Vec<Vec<ScalarValue>>]`)
+/// Inner `Vec<ScalarValue>` in the `ordering_values` can be thought as ordering information for the
+/// each `ScalarValue` in the `values` array.
+/// Desired ordering specified by `sort_options` argument (Should have same size with inner `Vec<ScalarValue>`
+/// of the `ordering_values` array).
+///
+/// As an example
+/// values can be \[
+///      \[1, 2, 3, 4, 5\],
+///      \[1, 2, 3, 4\],
+///      \[1, 2, 3, 4, 5, 6\],
+/// \]
+/// In this case we will be merging three arrays (doesn't have to be same size)
+/// and produce a merged array with size 15 (sum of 5+4+6)
+/// Merging will be done according to ordering at `ordering_values` vector.
+/// As an example `ordering_values` can be [
+///      \[(1, a), (2, b), (3, b), (4, a), (5, b) \],
+///      \[(1, a), (2, b), (3, b), (4, a) \],
+///      \[(1, b), (2, c), (3, d), (4, e), (5, a), (6, b) \],
+/// ]
+/// For each ScalarValue in the `values` we have a corresponding `Vec<ScalarValue>` (like timestamp of it)
+/// for the example above `sort_options` will have size two, that defines ordering requirement of the merge.
+/// Inner `Vec<ScalarValue>`s of the `ordering_values` will be compared according `sort_options` (Their sizes should match)
+fn merge_ordered_arrays(
+    // We will merge values into single `Vec<ScalarValue>`.
+    values: &[Vec<ScalarValue>],
+    // `values` will be merged according to `ordering_values`.
+    // Inner `Vec<ScalarValue>` can be thought as ordering information for the
+    // each `ScalarValue` in the values`.
+    ordering_values: &[Vec<Vec<ScalarValue>>],
+    // Defines according to which ordering comparisons should be done.
+    sort_options: &[SortOptions],
+) -> Result<Vec<ScalarValue>> {
+    // Keep track the most recent data of each branch, in binary heap data structure.
+    let mut heap: BinaryHeap<CustomElement> = BinaryHeap::new();
+
+    if !(values.len() == ordering_values.len()
+        && values
+            .iter()
+            .zip(ordering_values.iter())
+            .all(|(vals, ordering_vals)| vals.len() == ordering_vals.len()))
+    {
+        return Err(DataFusionError::Execution(
+            "Expects values arguments and/or ordering_values arguments to have same size"
+                .to_string(),
+        ));
+    }
+    let n_branch = values.len();
+    // For each branch we keep track of indices of next will be merged entry

Review Comment:
   I do wonder if there is some kernel we could add for this operation in arrow-rs (this code is quite similar in spirit to what is in SortPreservingMerge 🤔 )



##########
datafusion/physical-expr/src/aggregate/first_last.rs:
##########
@@ -117,42 +153,57 @@ impl PartialEq<dyn Any> for FirstValue {
 #[derive(Debug)]
 struct FirstValueAccumulator {
     first: ScalarValue,
-    // At the beginning, `is_set` is `false`, this means `first` is not seen yet.
-    // Once we see (`is_set=true`) first value, we do not update `first`.
+    // At the beginning, `is_set` is false, which means `first` is not seen yet.
+    // Once we see the first value, we set the `is_set` flag and do not update `first` anymore.
     is_set: bool,
+    orderings: Vec<ScalarValue>,

Review Comment:
   I think it would help to explain what this field is storing (namely, the values of the `ORDER BY` columns, if present), so different intermediate values can be correctly merged



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #6734: Add support for order-sensitive aggregation for multipartitions

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #6734:
URL: https://github.com/apache/arrow-datafusion/pull/6734#discussion_r1238779399


##########
datafusion/physical-expr/src/aggregate/first_last.rs:
##########
@@ -117,42 +153,57 @@ impl PartialEq<dyn Any> for FirstValue {
 #[derive(Debug)]
 struct FirstValueAccumulator {
     first: ScalarValue,
-    // At the beginning, `is_set` is `false`, this means `first` is not seen yet.
-    // Once we see (`is_set=true`) first value, we do not update `first`.
+    // At the beginning, `is_set` is false, which means `first` is not seen yet.

Review Comment:
   This can be done also, However, in this case we need to store `DataType` in the state. so that we can produce `NULL` value for correct type, during output if no value is received before calling `evaluate`. We can pursue either approach, what do you think? Either we will store `is_set` flag, `DataType` for the expression inside accumulator. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6734: Add support for order-sensitive aggregation for multipartitions

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6734:
URL: https://github.com/apache/arrow-datafusion/pull/6734#issuecomment-1601380923

   I plan to review this PR carefully tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org