You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/06/23 15:34:07 UTC

[arrow-datafusion] branch main updated: Add support for order-sensitive aggregation for multipartitions (#6734)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new e60e9d2eda Add support for order-sensitive aggregation for multipartitions (#6734)
e60e9d2eda is described below

commit e60e9d2edafdc931720ec4e7d053ce946daf61a7
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Fri Jun 23 18:34:02 2023 +0300

    Add support for order-sensitive aggregation for multipartitions (#6734)
    
    * Naive test pass
    
    i
    
    * Add new tests and simplifications
    
    * move tests to the .slt file
    
    * update requirement
    
    * update tests
    
    * Add support for partiallyOrdered aggregation sensitive.
    
    * Resolve linter errors
    
    * update comments
    
    * minor changes
    
    * retract changes in generated
    
    * update proto files
    
    * Simplifications
    
    * Make types consistent in schema, and data
    
    * Update todos
    
    * Convert API to vector
    
    * Convert get_finest to handle Vector inputs
    
    * simplifications, update comment
    
    * initial commit, add test
    
    * Add support for FIRST Aggregate function.
    
    * Add support for last aggregate
    
    * Update cargo.lock
    
    * Remove distinct, and limit from First and last aggregate.
    
    * Add reverse for First and Last Aggregator
    
    * Update cargo lock
    
    * Minor code simplifications
    
    * Update comment
    
    * Update documents
    
    * Fix projection pushdown bug
    
    * fix projection push down failure bug
    
    * combine first_agg and last_agg parsers
    
    * Update documentation
    
    * Update subproject
    
    * initial commit
    
    * Add test code
    
    * initial version
    
    * simplify prints
    
    * minor changes
    
    * sqllogictests pass
    
    * All tests pass
    
    * update proto function names
    
    * Minor changes
    
    * do not consider ordering requirement in ordering insensitive aggregators
    
    * Reject aggregate order by for window functions.
    
    * initial commit
    
    * Add new tests
    
    * Add reverse for sort removal
    
    * simplifications
    
    * simplifications
    
    * Bug fix, do not consider reverse requirement if it is not possible
    
    * Fix cargo lock file
    
    * Change reverse_order_by function place
    
    * Move required input ordering calculation logic to its own function
    
    * Add new tests
    
    * simplifications
    
    * Update comment
    
    * Rename aggregator first and last
    
    * minor change
    
    * Comment improvements
    
    * Remove count from First,Last accumulators
    
    * Remove clone
    
    * Simplifications
    
    * Simplifications, update comment
    
    * Improve comments
    
    * Move LexOrdering requirement to common place
    
    * Update comment, refactor implementation
    
    * bug fix:
    
    * Use naive requirement when reverse requirement is not helpful by convention.
    
    * Update test
    
    * Update comments
    
    * Change function place
    
    * Move get_finer to utls
    
    * change name of last, first impl,
    
    * Fix error message
    
    * initial commit
    
    * initial impl
    
    * all slt tests pass
    
    * Multipartition first, last aggregator
    
    * Add new slt tests
    
    * all tests pass
    
    * resolve linter errors
    
    * minor changes
    
    * Add new tests
    
    * array_agg multi partition implementation
    
    * Resolve linter errors
    
    * remove unnecessary prints
    
    * minor changes
    
    * minor changes
    
    * remove unnecessary prints
    
    * Simplifications
    
    * Simplifications
    
    * Add order by expressions in AggregateExpr
    
    * Update size calculation
    
    * Minor changes
    
    * Linear array merge implementation
    
    * Change merge algorithm to heap
    
    * Handle distinct and order-sensitive case
    
    * Update comments
    
    * Address reviews
    
    * Improve comments, use functional style when applicable, better attribute naming
    
    * Address reviews
    
    * Address reviews
    
    * Fix tests
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
    Co-authored-by: berkaysynnada <be...@synnada.ai>
---
 .../core/src/physical_plan/aggregates/mod.rs       | 155 +++--
 datafusion/core/src/physical_plan/windows/mod.rs   |  10 +-
 datafusion/core/src/physical_planner.rs            |  16 +-
 .../tests/sqllogictests/test_files/groupby.slt     | 294 +++++++++-
 .../src/aggregate/array_agg_ordered.rs             | 621 +++++++++++++++++++++
 datafusion/physical-expr/src/aggregate/build_in.rs |  39 +-
 .../physical-expr/src/aggregate/first_last.rs      | 203 +++++--
 datafusion/physical-expr/src/aggregate/mod.rs      |  14 +-
 datafusion/physical-expr/src/aggregate/utils.rs    |  24 +-
 datafusion/physical-expr/src/expressions/mod.rs    |   1 +
 datafusion/proto/proto/datafusion.proto            |   1 +
 datafusion/proto/src/generated/pbjson.rs           |  18 +
 datafusion/proto/src/generated/prost.rs            |   2 +
 datafusion/proto/src/physical_plan/mod.rs          |   4 +-
 datafusion/proto/src/physical_plan/to_proto.rs     |  13 +
 15 files changed, 1274 insertions(+), 141 deletions(-)

diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 8e68450078..343f7628b7 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -81,6 +81,8 @@ pub enum AggregateMode {
 /// Group By expression modes
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub enum GroupByOrderMode {
+    /// None of the expressions in the GROUP BY clause have an ordering.
+    None,
     /// Some of the expressions in the GROUP BY clause have an ordering.
     // For example, if the input is ordered by a, b, c and we group by b, a, d;
     // the mode will be `PartiallyOrdered` meaning a subset of group b, a, d
@@ -429,10 +431,12 @@ fn get_finest_requirement<
 /// ordering requirements of order-sensitive aggregation functions.
 fn calc_required_input_ordering(
     input: &Arc<dyn ExecutionPlan>,
-    aggr_expr: &mut [Arc<dyn AggregateExpr>],
+    aggr_exprs: &mut [Arc<dyn AggregateExpr>],
+    order_by_exprs: &mut [Option<LexOrdering>],
     aggregator_reqs: LexOrderingReq,
     aggregator_reverse_reqs: Option<LexOrderingReq>,
-    aggregation_ordering: &Option<AggregationOrdering>,
+    aggregation_ordering: &mut Option<AggregationOrdering>,
+    mode: &AggregateMode,
 ) -> Result<Option<LexOrderingReq>> {
     let mut required_input_ordering = vec![];
     // Boolean shows that whether `required_input_ordering` stored comes from
@@ -459,14 +463,14 @@ fn calc_required_input_ordering(
             // then we append the aggregator ordering requirement to the existing
             // ordering. This way, we can still run with bounded memory.
             mode: GroupByOrderMode::FullyOrdered | GroupByOrderMode::PartiallyOrdered,
-            ..
+            order_indices,
         }) = aggregation_ordering
         {
             // Get the section of the input ordering that enables us to run in
             // FullyOrdered or PartiallyOrdered modes:
             let requirement_prefix =
                 if let Some(existing_ordering) = input.output_ordering() {
-                    &existing_ordering[0..ordering.len()]
+                    &existing_ordering[0..order_indices.len()]
                 } else {
                     &[]
                 };
@@ -474,21 +478,44 @@ fn calc_required_input_ordering(
                 PhysicalSortRequirement::from_sort_exprs(requirement_prefix.iter());
             for req in aggregator_requirement {
                 if requirement.iter().all(|item| req.expr.ne(&item.expr)) {
-                    requirement.push(req);
+                    requirement.push(req.clone());
+                }
+                // In partial mode, append required ordering of the aggregator to the output ordering.
+                // In case of multiple partitions, this enables us to reduce partitions correctly.
+                if matches!(mode, AggregateMode::Partial)
+                    && ordering.iter().all(|item| req.expr.ne(&item.expr))
+                {
+                    ordering.push(req.into());
                 }
             }
             required_input_ordering = requirement;
         } else {
+            // If there was no pre-existing output ordering, the output ordering is simply the required
+            // ordering of the aggregator in partial mode.
+            if matches!(mode, AggregateMode::Partial)
+                && !aggregator_requirement.is_empty()
+            {
+                *aggregation_ordering = Some(AggregationOrdering {
+                    mode: GroupByOrderMode::None,
+                    order_indices: vec![],
+                    ordering: PhysicalSortRequirement::to_sort_exprs(
+                        aggregator_requirement.clone(),
+                    ),
+                });
+            }
             required_input_ordering = aggregator_requirement;
         }
-        // keep track of from which direction required_input_ordering is constructed
+        // Keep track of the direction from which required_input_ordering is constructed:
         reverse_req = is_reverse;
-        // If all of the order-sensitive aggregate functions are reversible (such as all of the order-sensitive aggregators are
-        // either FIRST_VALUE or LAST_VALUE). We can run aggregate expressions both in the direction of naive required ordering
-        // (e.g finest requirement that satisfy each aggregate function requirement) and in its reversed (opposite) direction.
-        // We analyze these two possibilities, and use the version that satisfies existing ordering (This saves us adding
-        // unnecessary SortExec to the final plan). If none of the versions satisfy existing ordering, we use naive required ordering.
-        // In short, if running aggregators in reverse order, helps us to remove a `SortExec`, we do so. Otherwise, we use aggregators as is.
+        // If all the order-sensitive aggregate functions are reversible (e.g. all the
+        // order-sensitive aggregators are either FIRST_VALUE or LAST_VALUE), then we can
+        // run aggregate expressions either in the given required ordering, (i.e. finest
+        // requirement that satisfies every aggregate function requirement) or its reverse
+        // (opposite) direction. We analyze these two possibilities, and use the version that
+        // satisfies existing ordering. This enables us to avoid an extra sort step in the final
+        // plan. If neither version satisfies the existing ordering, we use the given ordering
+        // requirement. In short, if running aggregators in reverse order help us to avoid a
+        // sorting step, we do so. Otherwise, we use the aggregators as is.
         let existing_ordering = input.output_ordering().unwrap_or(&[]);
         if ordering_satisfy_requirement_concrete(
             existing_ordering,
@@ -499,17 +526,20 @@ fn calc_required_input_ordering(
             break;
         }
     }
-    // If `required_input_ordering` is constructed using reverse requirement, we should reverse
-    // each `aggr_expr` to be able to correctly calculate their result in reverse order.
+    // If `required_input_ordering` is constructed using the reverse requirement, we
+    // should reverse each `aggr_expr` in order to correctly calculate their results
+    // in reverse order.
     if reverse_req {
-        aggr_expr
+        aggr_exprs
             .iter_mut()
-            .map(|elem| {
-                if is_order_sensitive(elem) {
-                    if let Some(reverse) = elem.reverse_expr() {
-                        *elem = reverse;
+            .zip(order_by_exprs.iter_mut())
+            .map(|(aggr_expr, ob_expr)| {
+                if is_order_sensitive(aggr_expr) {
+                    if let Some(reverse) = aggr_expr.reverse_expr() {
+                        *aggr_expr = reverse;
+                        *ob_expr = ob_expr.as_ref().map(|obs| reverse_order_bys(obs));
                     } else {
-                        return Err(DataFusionError::Execution(
+                        return Err(DataFusionError::Plan(
                             "Aggregate expression should have a reverse expression"
                                 .to_string(),
                         ));
@@ -529,6 +559,7 @@ impl AggregateExec {
         group_by: PhysicalGroupBy,
         mut aggr_expr: Vec<Arc<dyn AggregateExpr>>,
         filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
+        // Ordering requirement of each aggregate expression
         mut order_by_expr: Vec<Option<LexOrdering>>,
         input: Arc<dyn ExecutionPlan>,
         input_schema: SchemaRef,
@@ -555,8 +586,6 @@ impl AggregateExec {
                 }
             })
             .collect::<Vec<_>>();
-
-        let mut aggregator_reqs = vec![];
         let mut aggregator_reverse_reqs = None;
         // Currently we support order-sensitive aggregation only in `Single` mode.
         // For `Final` and `FinalPartitioned` modes, we cannot guarantee they will receive
@@ -564,31 +593,27 @@ impl AggregateExec {
         // in `Final` mode, it is not important to produce correct result in `Partial` mode.
         // We only support `Single` mode, where we are sure that output produced is final, and it
         // is produced in a single step.
-        if mode == AggregateMode::Single {
-            let requirement = get_finest_requirement(
-                &mut aggr_expr,
-                &mut order_by_expr,
-                || input.equivalence_properties(),
-                || input.ordering_equivalence_properties(),
-            )?;
-            let aggregator_requirement = requirement
-                .as_ref()
-                .map(|exprs| PhysicalSortRequirement::from_sort_exprs(exprs.iter()));
-            aggregator_reqs = aggregator_requirement.unwrap_or(vec![]);
-            // If all aggregate expressions are reversible, also consider reverse
-            // requirement(s). The reason is that existing ordering may satisfy the
-            // given requirement or its reverse. By considering both, we can generate better plans.
-            if aggr_expr
-                .iter()
-                .all(|expr| !is_order_sensitive(expr) || expr.reverse_expr().is_some())
-            {
-                let reverse_agg_requirement = requirement.map(|reqs| {
-                    PhysicalSortRequirement::from_sort_exprs(
-                        reverse_order_bys(&reqs).iter(),
-                    )
-                });
-                aggregator_reverse_reqs = reverse_agg_requirement;
-            }
+
+        let requirement = get_finest_requirement(
+            &mut aggr_expr,
+            &mut order_by_expr,
+            || input.equivalence_properties(),
+            || input.ordering_equivalence_properties(),
+        )?;
+        let aggregator_requirement = requirement
+            .as_ref()
+            .map(|exprs| PhysicalSortRequirement::from_sort_exprs(exprs.iter()));
+        let aggregator_reqs = aggregator_requirement.unwrap_or(vec![]);
+        // If all aggregate expressions are reversible, also consider reverse
+        // requirement(s). The reason is that existing ordering may satisfy the
+        // given requirement or its reverse. By considering both, we can generate better plans.
+        if aggr_expr
+            .iter()
+            .all(|expr| !is_order_sensitive(expr) || expr.reverse_expr().is_some())
+        {
+            aggregator_reverse_reqs = requirement.map(|reqs| {
+                PhysicalSortRequirement::from_sort_exprs(reverse_order_bys(&reqs).iter())
+            });
         }
 
         // construct a map from the input columns to the output columns of the Aggregation
@@ -601,26 +626,18 @@ impl AggregateExec {
             };
         }
 
-        let aggregation_ordering = calc_aggregation_ordering(&input, &group_by);
+        let mut aggregation_ordering = calc_aggregation_ordering(&input, &group_by);
 
         let required_input_ordering = calc_required_input_ordering(
             &input,
             &mut aggr_expr,
+            &mut order_by_expr,
             aggregator_reqs,
             aggregator_reverse_reqs,
-            &aggregation_ordering,
+            &mut aggregation_ordering,
+            &mode,
         )?;
 
-        // If aggregator is working on multiple partitions and there is an order-sensitive aggregator with a requirement return error.
-        if input.output_partitioning().partition_count() > 1
-            && order_by_expr.iter().any(|req| req.is_some())
-        {
-            return Err(DataFusionError::NotImplemented(
-                "Order-sensitive aggregators is not supported on multiple partitions"
-                    .to_string(),
-            ));
-        }
-
         Ok(AggregateExec {
             mode,
             group_by,
@@ -1001,14 +1018,28 @@ fn aggregate_expressions(
                 } else {
                     None
                 };
-                agg.expressions()
+                let mut result = agg
+                    .expressions()
                     .into_iter()
                     .map(|expr| {
                         pre_cast_type.clone().map_or(expr.clone(), |cast_type| {
                             Arc::new(CastExpr::new(expr, cast_type, None))
                         })
                     })
-                    .collect::<Vec<_>>()
+                    .collect::<Vec<_>>();
+                // In partial mode, append ordering requirements to expressions' results.
+                // Ordering requirements are used by subsequent executors to satisfy the required
+                // ordering for `AggregateMode::FinalPartitioned`/`AggregateMode::Final` modes.
+                if matches!(mode, AggregateMode::Partial) {
+                    if let Some(ordering_req) = agg.order_bys() {
+                        let ordering_exprs = ordering_req
+                            .iter()
+                            .map(|item| item.expr.clone())
+                            .collect::<Vec<_>>();
+                        result.extend(ordering_exprs);
+                    }
+                }
+                result
             })
             .collect()),
         // in this mode, we build the merge expressions of the aggregation
@@ -1903,6 +1934,8 @@ mod tests {
             Arc::new(col_a.clone()),
             "first1",
             DataType::Int32,
+            vec![],
+            vec![],
         )) as _;
         let mut aggr_exprs = vec![aggr_expr; order_by_exprs.len()];
         let res = get_finest_requirement(
diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs
index 261cfa04bf..88fafe99b4 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -67,8 +67,14 @@ pub fn create_window_expr(
 ) -> Result<Arc<dyn WindowExpr>> {
     Ok(match fun {
         WindowFunction::AggregateFunction(fun) => {
-            let aggregate =
-                aggregates::create_aggregate_expr(fun, false, args, input_schema, name)?;
+            let aggregate = aggregates::create_aggregate_expr(
+                fun,
+                false,
+                args,
+                &[],
+                input_schema,
+                name,
+            )?;
             window_expr_from_aggregate_expr(
                 partition_by,
                 order_by,
diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs
index aa0cc52daf..75566208e3 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1652,13 +1652,6 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
                 )?),
                 None => None,
             };
-            let agg_expr = aggregates::create_aggregate_expr(
-                fun,
-                *distinct,
-                &args,
-                physical_input_schema,
-                name,
-            )?;
             let order_by = match order_by {
                 Some(e) => Some(
                     e.iter()
@@ -1674,6 +1667,15 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
                 ),
                 None => None,
             };
+            let ordering_reqs = order_by.clone().unwrap_or(vec![]);
+            let agg_expr = aggregates::create_aggregate_expr(
+                fun,
+                *distinct,
+                &args,
+                &ordering_reqs,
+                physical_input_schema,
+                name,
+            )?;
             Ok((agg_expr, filter, order_by))
         }
         Expr::AggregateUDF(AggregateUDF {
diff --git a/datafusion/core/tests/sqllogictests/test_files/groupby.slt b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
index ca400e0ef4..597f2c1611 100644
--- a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
@@ -2076,7 +2076,7 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, FIRST_VALUE(
 ----TableScan: annotated_data_infinite2 projection=[a, b, c]
 physical_plan
 ProjectionExec: expr=[a@0 as a, b@1 as b, FIRST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as first_c]
---AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
+--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
 ----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
 
 query III
@@ -2084,10 +2084,10 @@ SELECT a, b, FIRST_VALUE(c ORDER BY a DESC) as first_c
   FROM annotated_data_infinite2
   GROUP BY a, b
 ----
-0 0 0
-0 1 25
-1 2 50
-1 3 75
+0 0 24
+0 1 49
+1 2 74
+1 3 99
 
 # test_source_sorted_groupby4
 
@@ -2102,7 +2102,7 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, LAST_VALUE(a
 ----TableScan: annotated_data_infinite2 projection=[a, b, c]
 physical_plan
 ProjectionExec: expr=[a@0 as a, b@1 as b, LAST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as last_c]
---AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
+--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
 ----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
 
 query III
@@ -2110,10 +2110,10 @@ SELECT a, b, LAST_VALUE(c ORDER BY a DESC) as last_c
   FROM annotated_data_infinite2
   GROUP BY a, b
 ----
-0 0 24
-0 1 49
-1 2 74
-1 3 99
+0 0 0
+0 1 25
+1 2 50
+1 3 75
 
 # when LAST_VALUE, or FIRST_VALUE value do not contain ordering requirement
 # queries should still work, However, result depends on the scanning order and
@@ -2580,12 +2580,280 @@ FRA 200 50 250
 
 # Run order-sensitive aggregators in multiple partitions
 statement ok
-set datafusion.execution.target_partitions = 2;
+set datafusion.execution.target_partitions = 8;
+
+# order-sensitive FIRST_VALUE and LAST_VALUE aggregators should work in
+# multi-partitions without group by also.
+query TT
+EXPLAIN SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
+  LAST_VALUE(amount ORDER BY ts ASC) AS fv2
+  FROM sales_global
+  GROUP BY country
+  ORDER BY country
+----
+logical_plan
+Sort: sales_global.country ASC NULLS LAST
+--Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST] AS fv2
+----Aggregate: groupBy=[[sales_global.country]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]]]
+------TableScan: sales_global projection=[country, ts, amount]
+physical_plan
+SortPreservingMergeExec: [country@0 ASC NULLS LAST]
+--SortExec: expr=[country@0 ASC NULLS LAST]
+----ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@2 as fv2]
+------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+--------SortExec: expr=[ts@1 ASC NULLS LAST]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8
+--------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)], ordering_mode=None
+----------------SortExec: expr=[ts@1 ASC NULLS LAST]
+------------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+--------------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query TRR
+SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
+  LAST_VALUE(amount ORDER BY ts ASC) AS fv2
+  FROM sales_global
+  GROUP BY country
+  ORDER BY country
+----
+FRA 50 200
+GRC 30 80
+TUR 75 100
+
+# Conversion in between FIRST_VALUE and LAST_VALUE to resolve
+# contradictory requirements should work in multi partitions.
+query TT
+EXPLAIN SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
+  LAST_VALUE(amount ORDER BY ts DESC) AS fv2
+  FROM sales_global
+  GROUP BY country
+  ORDER BY country
+----
+logical_plan
+Sort: sales_global.country ASC NULLS LAST
+--Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS fv2
+----Aggregate: groupBy=[[sales_global.country]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]]]
+------TableScan: sales_global projection=[country, ts, amount]
+physical_plan
+SortPreservingMergeExec: [country@0 ASC NULLS LAST]
+--SortExec: expr=[country@0 ASC NULLS LAST]
+----ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as fv2]
+------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)]
+--------SortExec: expr=[ts@1 ASC NULLS LAST]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8
+--------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)], ordering_mode=None
+----------------SortExec: expr=[ts@1 ASC NULLS LAST]
+------------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+--------------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query TRR
+SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
+  LAST_VALUE(amount ORDER BY ts DESC) AS fv2
+  FROM sales_global
+  GROUP BY country
+  ORDER BY country
+----
+FRA 50 50
+GRC 30 30
+TUR 75 75
+
+# order-sensitive FIRST_VALUE and LAST_VALUE aggregators should work in
+# multi-partitions without group by also.
+query TT
+EXPLAIN SELECT FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
+  LAST_VALUE(amount ORDER BY ts ASC) AS fv2
+  FROM sales_global
+----
+logical_plan
+Projection: FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST] AS fv2
+--Aggregate: groupBy=[[]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]]]
+----TableScan: sales_global projection=[ts, amount]
+physical_plan
+ProjectionExec: expr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@0 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as fv2]
+--AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+----SortPreservingMergeExec: [ts@0 ASC NULLS LAST]
+------AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)], ordering_mode=None
+--------SortExec: expr=[ts@0 ASC NULLS LAST]
+----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query RR
+SELECT FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
+  LAST_VALUE(amount ORDER BY ts ASC) AS fv2
+  FROM sales_global
+----
+30 80
+
+# Conversion in between FIRST_VALUE and LAST_VALUE to resolve
+# contradictory requirements should work in multi partitions.
+query TT
+EXPLAIN SELECT FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
+  LAST_VALUE(amount ORDER BY ts DESC) AS fv2
+  FROM sales_global
+----
+logical_plan
+Projection: FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS fv2
+--Aggregate: groupBy=[[]], aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]]]
+----TableScan: sales_global projection=[ts, amount]
+physical_plan
+ProjectionExec: expr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@0 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv2]
+--AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)]
+----SortPreservingMergeExec: [ts@0 ASC NULLS LAST]
+------AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)], ordering_mode=None
+--------SortExec: expr=[ts@0 ASC NULLS LAST]
+----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query RR
+SELECT FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
+  LAST_VALUE(amount ORDER BY ts DESC) AS fv2
+  FROM sales_global
+----
+30 30
 
-# Currently, we do not support running order-sensitive aggregators in multiple partitions.
-statement error This feature is not implemented: Order-sensitive aggregators is not supported on multiple partitions
+# ARRAY_AGG should work in multiple partitions
+query TT
+EXPLAIN SELECT ARRAY_AGG(amount ORDER BY ts ASC) AS array_agg1
+  FROM sales_global
+----
+logical_plan
+Projection: ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST] AS array_agg1
+--Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]]]
+----TableScan: sales_global projection=[ts, amount]
+physical_plan
+ProjectionExec: expr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@0 as array_agg1]
+--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount)]
+----SortPreservingMergeExec: [ts@0 ASC NULLS LAST]
+------AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(sales_global.amount)], ordering_mode=None
+--------SortExec: expr=[ts@0 ASC NULLS LAST]
+----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query ?
+SELECT ARRAY_AGG(amount ORDER BY ts ASC) AS array_agg1
+  FROM sales_global
+----
+[30.0, 50.0, 75.0, 200.0, 100.0, 80.0]
+
+# ARRAY_AGG should work in multiple partitions
+query TT
+EXPLAIN SELECT ARRAY_AGG(amount ORDER BY ts DESC) AS array_agg1
+  FROM sales_global
+----
+logical_plan
+Projection: ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST] AS array_agg1
+--Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]]]
+----TableScan: sales_global projection=[ts, amount]
+physical_plan
+ProjectionExec: expr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@0 as array_agg1]
+--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount)]
+----SortPreservingMergeExec: [ts@0 DESC]
+------AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(sales_global.amount)], ordering_mode=None
+--------SortExec: expr=[ts@0 DESC]
+----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query ?
+SELECT ARRAY_AGG(amount ORDER BY ts DESC) AS array_agg1
+  FROM sales_global
+----
+[100.0, 80.0, 200.0, 75.0, 50.0, 30.0]
+
+# ARRAY_AGG should work in multiple partitions
+query TT
+EXPLAIN SELECT ARRAY_AGG(amount ORDER BY amount ASC) AS array_agg1
+  FROM sales_global
+----
+logical_plan
+Projection: ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS array_agg1
+--Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]]]
+----TableScan: sales_global projection=[amount]
+physical_plan
+ProjectionExec: expr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@0 as array_agg1]
+--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount)]
+----SortPreservingMergeExec: [amount@0 ASC NULLS LAST]
+------AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(sales_global.amount)], ordering_mode=None
+--------SortExec: expr=[amount@0 ASC NULLS LAST]
+----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query ?
+SELECT ARRAY_AGG(amount ORDER BY amount ASC) AS array_agg1
+  FROM sales_global
+----
+[30.0, 50.0, 75.0, 80.0, 100.0, 200.0]
+
+# ARRAY_AGG should work in multiple partitions
+query TT
+EXPLAIN SELECT country, ARRAY_AGG(amount ORDER BY amount ASC) AS array_agg1
+  FROM sales_global
+  GROUP BY country
+  ORDER BY country
+----
+logical_plan
+Sort: sales_global.country ASC NULLS LAST
+--Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS array_agg1
+----Aggregate: groupBy=[[sales_global.country]], aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]]]
+------TableScan: sales_global projection=[country, amount]
+physical_plan
+SortPreservingMergeExec: [country@0 ASC NULLS LAST]
+--SortExec: expr=[country@0 ASC NULLS LAST]
+----ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as array_agg1]
+------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount)]
+--------SortExec: expr=[amount@1 ASC NULLS LAST]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8
+--------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount)], ordering_mode=None
+----------------SortExec: expr=[amount@1 ASC NULLS LAST]
+------------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+--------------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query T?
+SELECT country, ARRAY_AGG(amount ORDER BY amount ASC) AS array_agg1
+  FROM sales_global
+  GROUP BY country
+  ORDER BY country
+----
+FRA [50.0, 200.0]
+GRC [30.0, 80.0]
+TUR [75.0, 100.0]
+
+# ARRAY_AGG, FIRST_VALUE, LAST_VALUE should work in multiple partitions
+query TT
+EXPLAIN SELECT country, ARRAY_AGG(amount ORDER BY amount DESC) AS amounts,
+  FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
+  LAST_VALUE(amount ORDER BY amount DESC) AS fv2
+  FROM sales_global
+  GROUP BY country
+  ORDER BY country
+----
+logical_plan
+Sort: sales_global.country ASC NULLS LAST
+--Projection: sales_global.country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST] AS amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST] AS fv2
+----Aggregate: groupBy=[[sales_global.country]], aggr=[[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST], FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]]]
+------TableScan: sales_global projection=[country, amount]
+physical_plan
+SortPreservingMergeExec: [country@0 ASC NULLS LAST]
+--SortExec: expr=[country@0 ASC NULLS LAST]
+----ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2]
+------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+--------SortExec: expr=[amount@1 DESC]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8
+--------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)], ordering_mode=None
+----------------SortExec: expr=[amount@1 DESC]
+------------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+--------------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query T?RR
 SELECT country, ARRAY_AGG(amount ORDER BY amount DESC) AS amounts,
   FIRST_VALUE(amount ORDER BY amount ASC) AS fv1,
   LAST_VALUE(amount ORDER BY amount DESC) AS fv2
   FROM sales_global
   GROUP BY country
+  ORDER BY country
+----
+FRA [200.0, 50.0] 50 50
+GRC [80.0, 30.0] 30 30
+TUR [100.0, 75.0] 75 75
diff --git a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
new file mode 100644
index 0000000000..f0eb442af0
--- /dev/null
+++ b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
@@ -0,0 +1,621 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions which specify ordering requirement
+//! that can evaluated at runtime during query execution
+
+use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
+use crate::expressions::format_state_name;
+use crate::{AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr};
+use arrow::array::ArrayRef;
+use arrow::datatypes::{DataType, Field};
+use arrow_array::{Array, ListArray};
+use arrow_schema::{Fields, SortOptions};
+use datafusion_common::utils::{compare_rows, get_row_at_idx};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+use itertools::izip;
+use std::any::Any;
+use std::cmp::Ordering;
+use std::collections::BinaryHeap;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// Expression for a ARRAY_AGG(ORDER BY) aggregation.
+/// When aggregation works in multiple partitions
+/// aggregations are split into multiple partitions,
+/// then their results are merged. This aggregator
+/// is a version of ARRAY_AGG that can support producing
+/// intermediate aggregation (with necessary side information)
+/// and that can merge aggregations from multiple partitions.
+#[derive(Debug)]
+pub struct OrderSensitiveArrayAgg {
+    name: String,
+    input_data_type: DataType,
+    order_by_data_types: Vec<DataType>,
+    expr: Arc<dyn PhysicalExpr>,
+    ordering_req: LexOrdering,
+}
+
+impl OrderSensitiveArrayAgg {
+    /// Create a new `OrderSensitiveArrayAgg` aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+        order_by_data_types: Vec<DataType>,
+        ordering_req: LexOrdering,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            input_data_type,
+            order_by_data_types,
+            ordering_req,
+        }
+    }
+}
+
+impl AggregateExpr for OrderSensitiveArrayAgg {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new_list(
+            &self.name,
+            Field::new("item", self.input_data_type.clone(), true),
+            false,
+        ))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(OrderSensitiveArrayAggAccumulator::try_new(
+            &self.input_data_type,
+            &self.order_by_data_types,
+            self.ordering_req.clone(),
+        )?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        let mut fields = vec![Field::new_list(
+            format_state_name(&self.name, "array_agg"),
+            Field::new("item", self.input_data_type.clone(), true),
+            false,
+        )];
+        let orderings = ordering_fields(&self.ordering_req, &self.order_by_data_types);
+        fields.push(Field::new_list(
+            format_state_name(&self.name, "array_agg_orderings"),
+            Field::new(
+                "item",
+                DataType::Struct(Fields::from(orderings.clone())),
+                true,
+            ),
+            false,
+        ));
+        fields.extend(orderings);
+        Ok(fields)
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
+        if self.ordering_req.is_empty() {
+            None
+        } else {
+            Some(&self.ordering_req)
+        }
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+impl PartialEq<dyn Any> for OrderSensitiveArrayAgg {
+    fn eq(&self, other: &dyn Any) -> bool {
+        down_cast_any_ref(other)
+            .downcast_ref::<Self>()
+            .map(|x| {
+                self.name == x.name
+                    && self.input_data_type == x.input_data_type
+                    && self.order_by_data_types == x.order_by_data_types
+                    && self.expr.eq(&x.expr)
+            })
+            .unwrap_or(false)
+    }
+}
+
+#[derive(Debug)]
+pub(crate) struct OrderSensitiveArrayAggAccumulator {
+    // `values` stores entries in the ARRAY_AGG result.
+    values: Vec<ScalarValue>,
+    // `ordering_values` stores values of ordering requirement expression
+    // corresponding to each value in the ARRAY_AGG.
+    // For each `ScalarValue` inside `values`, there will be a corresponding
+    // `Vec<ScalarValue>` inside `ordering_values` which stores it ordering.
+    // This information is used during merging results of the different partitions.
+    // For detailed information how merging is done see [`merge_ordered_arrays`]
+    ordering_values: Vec<Vec<ScalarValue>>,
+    // `datatypes` stores, datatype of expression inside ARRAY_AGG and ordering requirement expressions.
+    datatypes: Vec<DataType>,
+    // Stores ordering requirement of the Accumulator
+    ordering_req: LexOrdering,
+}
+
+impl OrderSensitiveArrayAggAccumulator {
+    /// Create a new order-sensitive ARRAY_AGG accumulator based on the given
+    /// item data type.
+    pub fn try_new(
+        datatype: &DataType,
+        ordering_dtypes: &[DataType],
+        ordering_req: LexOrdering,
+    ) -> Result<Self> {
+        let mut datatypes = vec![datatype.clone()];
+        datatypes.extend(ordering_dtypes.iter().cloned());
+        Ok(Self {
+            values: vec![],
+            ordering_values: vec![],
+            datatypes,
+            ordering_req,
+        })
+    }
+}
+
+impl Accumulator for OrderSensitiveArrayAggAccumulator {
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        if values.is_empty() {
+            return Ok(());
+        }
+        let n_row = values[0].len();
+        for index in 0..n_row {
+            let row = get_row_at_idx(values, index)?;
+            self.values.push(row[0].clone());
+            self.ordering_values.push(row[1..].to_vec());
+        }
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        if states.is_empty() {
+            return Ok(());
+        }
+        // First entry in the state is the aggregation result.
+        let array_agg_values = &states[0];
+        // 2nd entry stores values received for ordering requirement columns, for each aggregation value inside ARRAY_AGG list.
+        // For each `ScalarValue` inside ARRAY_AGG list, we will receive a `Vec<ScalarValue>` that stores
+        // values received from its ordering requirement expression. (This information is necessary for during merging).
+        let agg_orderings = &states[1];
+        if agg_orderings.as_any().is::<ListArray>() {
+            // Stores ARRAY_AGG results coming from each partition
+            let mut partition_values = vec![];
+            // Stores ordering requirement expression results coming from each partition
+            let mut partition_ordering_values = vec![];
+            for index in 0..agg_orderings.len() {
+                let ordering = ScalarValue::try_from_array(agg_orderings, index)?;
+                // Ordering requirement expression values for each entry in the ARRAY_AGG list
+                let other_ordering_values =
+                    self.convert_array_agg_to_orderings(ordering)?;
+                // ARRAY_AGG result. (It is a `ScalarValue::List` under the hood, it stores `Vec<ScalarValue>`)
+                let array_agg_res = ScalarValue::try_from_array(array_agg_values, index)?;
+                if let ScalarValue::List(Some(other_values), _) = array_agg_res {
+                    partition_values.push(other_values);
+                    partition_ordering_values.push(other_ordering_values);
+                } else {
+                    return Err(DataFusionError::Internal(
+                        "ARRAY_AGG state must be list!".into(),
+                    ));
+                }
+            }
+            let sort_options = self
+                .ordering_req
+                .iter()
+                .map(|sort_expr| sort_expr.options)
+                .collect::<Vec<_>>();
+            self.values = merge_ordered_arrays(
+                &partition_values,
+                &partition_ordering_values,
+                &sort_options,
+            )?;
+        } else {
+            return Err(DataFusionError::Execution(
+                "Expects to receive a list array".to_string(),
+            ));
+        }
+        Ok(())
+    }
+
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        let mut result = vec![self.evaluate()?];
+        result.push(self.evaluate_orderings()?);
+        let last_ordering = if let Some(ordering) = self.ordering_values.last() {
+            ordering.clone()
+        } else {
+            // In case ordering is empty, construct ordering as NULL:
+            self.datatypes
+                .iter()
+                .skip(1)
+                .map(ScalarValue::try_from)
+                .collect::<Result<Vec<_>>>()?
+        };
+        result.extend(last_ordering);
+        Ok(result)
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(ScalarValue::new_list(
+            Some(self.values.clone()),
+            self.datatypes[0].clone(),
+        ))
+    }
+
+    fn size(&self) -> usize {
+        let mut total = std::mem::size_of_val(self)
+            + ScalarValue::size_of_vec(&self.values)
+            - std::mem::size_of_val(&self.values);
+
+        // Add size of the `self.ordering_values`
+        total +=
+            std::mem::size_of::<Vec<ScalarValue>>() * self.ordering_values.capacity();
+        for row in &self.ordering_values {
+            total += ScalarValue::size_of_vec(row) - std::mem::size_of_val(row);
+        }
+
+        // Add size of the `self.datatypes`
+        total += std::mem::size_of::<DataType>() * self.datatypes.capacity();
+        for dtype in &self.datatypes {
+            total += dtype.size() - std::mem::size_of_val(dtype);
+        }
+
+        // Add size of the `self.ordering_req`
+        total += std::mem::size_of::<PhysicalSortExpr>() * self.ordering_req.capacity();
+        // TODO: Calculate size of each `PhysicalSortExpr` more accurately.
+        total
+    }
+}
+
+impl OrderSensitiveArrayAggAccumulator {
+    fn convert_array_agg_to_orderings(
+        &self,
+        in_data: ScalarValue,
+    ) -> Result<Vec<Vec<ScalarValue>>> {
+        if let ScalarValue::List(Some(list_vals), _field_ref) = in_data {
+            list_vals.into_iter().map(|struct_vals| {
+                if let ScalarValue::Struct(Some(orderings), _fields) = struct_vals {
+                    Ok(orderings)
+                } else {
+                    Err(DataFusionError::Execution(format!(
+                        "Expects to receive ScalarValue::Struct(Some(..), _) but got:{:?}",
+                        struct_vals.get_datatype()
+                    )))
+                }
+            }).collect::<Result<Vec<_>>>()
+        } else {
+            Err(DataFusionError::Execution(format!(
+                "Expects to receive ScalarValue::List(Some(..), _) but got:{:?}",
+                in_data.get_datatype()
+            )))
+        }
+    }
+
+    fn evaluate_orderings(&self) -> Result<ScalarValue> {
+        let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]);
+        let struct_field = Fields::from(fields.clone());
+        let orderings = self
+            .ordering_values
+            .iter()
+            .map(|ordering| {
+                ScalarValue::Struct(Some(ordering.clone()), struct_field.clone())
+            })
+            .collect();
+        let struct_type = DataType::Struct(Fields::from(fields));
+        Ok(ScalarValue::new_list(Some(orderings), struct_type))
+    }
+}
+
+/// This is a wrapper struct to be able to correctly merge ARRAY_AGG
+/// data from multiple partitions using `BinaryHeap`.
+/// When used inside `BinaryHeap` this struct returns smallest `CustomElement`,
+/// where smallest is determined by `ordering` values (`Vec<ScalarValue>`)
+/// according to `sort_options`
+#[derive(Debug, PartialEq, Eq)]
+struct CustomElement<'a> {
+    // Stores from which partition entry is received
+    branch_idx: usize,
+    // values to be merged
+    value: ScalarValue,
+    // according to `ordering` values, comparisons will be done.
+    ordering: Vec<ScalarValue>,
+    // `sort_options` defines, desired ordering by the user
+    sort_options: &'a [SortOptions],
+}
+
+impl<'a> CustomElement<'a> {
+    fn new(
+        branch_idx: usize,
+        value: ScalarValue,
+        ordering: Vec<ScalarValue>,
+        sort_options: &'a [SortOptions],
+    ) -> Self {
+        Self {
+            branch_idx,
+            value,
+            ordering,
+            sort_options,
+        }
+    }
+
+    fn ordering(
+        &self,
+        current: &[ScalarValue],
+        target: &[ScalarValue],
+    ) -> Result<Ordering> {
+        // Calculate ordering according to `sort_options`
+        compare_rows(current, target, self.sort_options)
+    }
+}
+
+// Overwrite ordering implementation such that
+// - `self.ordering` values are used for comparison,
+// - When used inside `BinaryHeap` it is a min-heap.
+impl<'a> Ord for CustomElement<'a> {
+    fn cmp(&self, other: &Self) -> Ordering {
+        // Compares according to custom ordering
+        self.ordering(&self.ordering, &other.ordering)
+            // Convert max heap to min heap
+            .map(|ordering| ordering.reverse())
+            // This function return error, when `self.ordering` and `other.ordering`
+            // have different types (such as one is `ScalarValue::Int64`, other is `ScalarValue::Float32`)
+            // Here this case won't happen, because data from each partition will have same type
+            .unwrap()
+    }
+}
+
+impl<'a> PartialOrd for CustomElement<'a> {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+/// This functions merges `values` array (`&[Vec<ScalarValue>]`) into single array `Vec<ScalarValue>`
+/// Merging done according to ordering values stored inside `ordering_values` (`&[Vec<Vec<ScalarValue>>]`)
+/// Inner `Vec<ScalarValue>` in the `ordering_values` can be thought as ordering information for the
+/// each `ScalarValue` in the `values` array.
+/// Desired ordering specified by `sort_options` argument (Should have same size with inner `Vec<ScalarValue>`
+/// of the `ordering_values` array).
+///
+/// As an example
+/// values can be \[
+///      \[1, 2, 3, 4, 5\],
+///      \[1, 2, 3, 4\],
+///      \[1, 2, 3, 4, 5, 6\],
+/// \]
+/// In this case we will be merging three arrays (doesn't have to be same size)
+/// and produce a merged array with size 15 (sum of 5+4+6)
+/// Merging will be done according to ordering at `ordering_values` vector.
+/// As an example `ordering_values` can be [
+///      \[(1, a), (2, b), (3, b), (4, a), (5, b) \],
+///      \[(1, a), (2, b), (3, b), (4, a) \],
+///      \[(1, b), (2, c), (3, d), (4, e), (5, a), (6, b) \],
+/// ]
+/// For each ScalarValue in the `values` we have a corresponding `Vec<ScalarValue>` (like timestamp of it)
+/// for the example above `sort_options` will have size two, that defines ordering requirement of the merge.
+/// Inner `Vec<ScalarValue>`s of the `ordering_values` will be compared according `sort_options` (Their sizes should match)
+fn merge_ordered_arrays(
+    // We will merge values into single `Vec<ScalarValue>`.
+    values: &[Vec<ScalarValue>],
+    // `values` will be merged according to `ordering_values`.
+    // Inner `Vec<ScalarValue>` can be thought as ordering information for the
+    // each `ScalarValue` in the values`.
+    ordering_values: &[Vec<Vec<ScalarValue>>],
+    // Defines according to which ordering comparisons should be done.
+    sort_options: &[SortOptions],
+) -> Result<Vec<ScalarValue>> {
+    // Keep track the most recent data of each branch, in binary heap data structure.
+    let mut heap: BinaryHeap<CustomElement> = BinaryHeap::new();
+
+    if !(values.len() == ordering_values.len()
+        && values
+            .iter()
+            .zip(ordering_values.iter())
+            .all(|(vals, ordering_vals)| vals.len() == ordering_vals.len()))
+    {
+        return Err(DataFusionError::Execution(
+            "Expects values arguments and/or ordering_values arguments to have same size"
+                .to_string(),
+        ));
+    }
+    let n_branch = values.len();
+    // For each branch we keep track of indices of next will be merged entry
+    let mut indices = vec![0_usize; n_branch];
+    // Keep track of sizes of each branch.
+    let end_indices = (0..n_branch)
+        .map(|idx| values[idx].len())
+        .collect::<Vec<_>>();
+    let mut merged_values = vec![];
+    // Continue iterating the loop until consuming data of all branches.
+    loop {
+        let min_elem = if let Some(min_elem) = heap.pop() {
+            min_elem
+        } else {
+            // Heap is empty, fill it with the next entries from each branch.
+            for (idx, end_idx, ordering, branch_index) in izip!(
+                indices.iter(),
+                end_indices.iter(),
+                ordering_values.iter(),
+                0..n_branch
+            ) {
+                // We consumed this branch, skip it
+                if idx == end_idx {
+                    continue;
+                }
+
+                // Push the next element to the heap.
+                let elem = CustomElement::new(
+                    branch_index,
+                    values[branch_index][*idx].clone(),
+                    ordering[*idx].to_vec(),
+                    sort_options,
+                );
+                heap.push(elem);
+            }
+            // Now we have filled the heap, get the largest entry (this will be the next element in merge)
+            if let Some(min_elem) = heap.pop() {
+                min_elem
+            } else {
+                // Heap is empty, this means that all indices are same with end_indices. e.g
+                // We have consumed all of the branches. Merging is completed
+                // Exit from the loop
+                break;
+            }
+        };
+        let branch_idx = min_elem.branch_idx;
+        // Increment the index of merged branch,
+        indices[branch_idx] += 1;
+        let row_idx = indices[branch_idx];
+        merged_values.push(min_elem.value.clone());
+        if row_idx < end_indices[branch_idx] {
+            // Push next entry in the most recently consumed branch to the heap
+            // If there is an available entry
+            let value = values[branch_idx][row_idx].clone();
+            let ordering_row = ordering_values[branch_idx][row_idx].to_vec();
+            let elem = CustomElement::new(branch_idx, value, ordering_row, sort_options);
+            heap.push(elem);
+        }
+    }
+
+    Ok(merged_values)
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::aggregate::array_agg_ordered::merge_ordered_arrays;
+    use arrow_array::{Array, ArrayRef, Int64Array};
+    use arrow_schema::SortOptions;
+    use datafusion_common::utils::get_row_at_idx;
+    use datafusion_common::{Result, ScalarValue};
+    use std::sync::Arc;
+
+    #[test]
+    fn test_merge_asc() -> Result<()> {
+        let lhs_arrays: Vec<ArrayRef> = vec![
+            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
+            Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
+        ];
+        let n_row = lhs_arrays[0].len();
+        let lhs_orderings = (0..n_row)
+            .map(|idx| get_row_at_idx(&lhs_arrays, idx))
+            .collect::<Result<Vec<_>>>()?;
+
+        let rhs_arrays: Vec<ArrayRef> = vec![
+            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
+            Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
+        ];
+        let n_row = rhs_arrays[0].len();
+        let rhs_orderings = (0..n_row)
+            .map(|idx| get_row_at_idx(&rhs_arrays, idx))
+            .collect::<Result<Vec<_>>>()?;
+        let sort_options = vec![
+            SortOptions {
+                descending: false,
+                nulls_first: false,
+            },
+            SortOptions {
+                descending: false,
+                nulls_first: false,
+            },
+        ];
+
+        let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef;
+        let lhs_vals = (0..lhs_vals_arr.len())
+            .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
+            .collect::<Result<Vec<_>>>()?;
+
+        let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef;
+        let rhs_vals = (0..rhs_vals_arr.len())
+            .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
+            .collect::<Result<Vec<_>>>()?;
+        let expected =
+            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef;
+
+        let merged_vals = merge_ordered_arrays(
+            &[lhs_vals, rhs_vals],
+            &[lhs_orderings, rhs_orderings],
+            &sort_options,
+        )?;
+        let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
+        assert_eq!(&merged_vals, &expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_merge_desc() -> Result<()> {
+        let lhs_arrays: Vec<ArrayRef> = vec![
+            Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
+            Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
+        ];
+        let n_row = lhs_arrays[0].len();
+        let lhs_orderings = (0..n_row)
+            .map(|idx| get_row_at_idx(&lhs_arrays, idx))
+            .collect::<Result<Vec<_>>>()?;
+
+        let rhs_arrays: Vec<ArrayRef> = vec![
+            Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
+            Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
+        ];
+        let n_row = rhs_arrays[0].len();
+        let rhs_orderings = (0..n_row)
+            .map(|idx| get_row_at_idx(&rhs_arrays, idx))
+            .collect::<Result<Vec<_>>>()?;
+        let sort_options = vec![
+            SortOptions {
+                descending: true,
+                nulls_first: false,
+            },
+            SortOptions {
+                descending: true,
+                nulls_first: false,
+            },
+        ];
+
+        // Values (which will be merged) doesn't have to be ordered.
+        let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef;
+        let lhs_vals = (0..lhs_vals_arr.len())
+            .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
+            .collect::<Result<Vec<_>>>()?;
+
+        let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef;
+        let rhs_vals = (0..rhs_vals_arr.len())
+            .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
+            .collect::<Result<Vec<_>>>()?;
+        let expected =
+            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 1, 1, 2, 2])) as ArrayRef;
+
+        let merged_vals = merge_ordered_arrays(
+            &[lhs_vals, rhs_vals],
+            &[lhs_orderings, rhs_orderings],
+            &sort_options,
+        )?;
+        let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
+
+        assert_eq!(&merged_vals, &expected);
+        Ok(())
+    }
+}
diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs b/datafusion/physical-expr/src/aggregate/build_in.rs
index 71ddf91315..3f778b6f17 100644
--- a/datafusion/physical-expr/src/aggregate/build_in.rs
+++ b/datafusion/physical-expr/src/aggregate/build_in.rs
@@ -26,7 +26,7 @@
 //! * Signature: see `Signature`
 //! * Return type: a function `(arg_types) -> return_type`. E.g. for min, ([f32]) -> f32, ([f64]) -> f64.
 
-use crate::{expressions, AggregateExpr, PhysicalExpr};
+use crate::{expressions, AggregateExpr, PhysicalExpr, PhysicalSortExpr};
 use arrow::datatypes::Schema;
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::aggregate_function::{return_type, sum_type_of_avg};
@@ -39,6 +39,7 @@ pub fn create_aggregate_expr(
     fun: &AggregateFunction,
     distinct: bool,
     input_phy_exprs: &[Arc<dyn PhysicalExpr>],
+    ordering_req: &[PhysicalSortExpr],
     input_schema: &Schema,
     name: impl Into<String>,
 ) -> Result<Arc<dyn AggregateExpr>> {
@@ -49,8 +50,11 @@ pub fn create_aggregate_expr(
         .map(|e| e.data_type(input_schema))
         .collect::<Result<Vec<_>>>()?;
     let rt_type = return_type(fun, &input_phy_types)?;
+    let ordering_types = ordering_req
+        .iter()
+        .map(|e| e.expr.data_type(input_schema))
+        .collect::<Result<Vec<_>>>()?;
     let input_phy_exprs = input_phy_exprs.to_vec();
-
     Ok(match (fun, distinct) {
         (AggregateFunction::Count, false) => Arc::new(
             expressions::Count::new_with_multiple_exprs(input_phy_exprs, name, rt_type),
@@ -116,12 +120,27 @@ pub fn create_aggregate_expr(
                 input_phy_types[0].clone(),
             ))
         }
-        (AggregateFunction::ArrayAgg, false) => Arc::new(expressions::ArrayAgg::new(
-            input_phy_exprs[0].clone(),
-            name,
-            input_phy_types[0].clone(),
-        )),
+        (AggregateFunction::ArrayAgg, false) => {
+            let expr = input_phy_exprs[0].clone();
+            let data_type = input_phy_types[0].clone();
+            if ordering_req.is_empty() {
+                Arc::new(expressions::ArrayAgg::new(expr, name, data_type))
+            } else {
+                Arc::new(expressions::OrderSensitiveArrayAgg::new(
+                    expr,
+                    name,
+                    data_type,
+                    ordering_types,
+                    ordering_req.to_vec(),
+                ))
+            }
+        }
         (AggregateFunction::ArrayAgg, true) => {
+            if !ordering_req.is_empty() {
+                return Err(DataFusionError::NotImplemented(
+                    "ARRAY_AGG(DISTINCT ORDER BY a ASC) order-sensitive aggregations are not available".to_string(),
+                ));
+            }
             Arc::new(expressions::DistinctArrayAgg::new(
                 input_phy_exprs[0].clone(),
                 name,
@@ -292,11 +311,15 @@ pub fn create_aggregate_expr(
             input_phy_exprs[0].clone(),
             name,
             input_phy_types[0].clone(),
+            ordering_req.to_vec(),
+            ordering_types,
         )),
         (AggregateFunction::LastValue, _) => Arc::new(expressions::LastValue::new(
             input_phy_exprs[0].clone(),
             name,
             input_phy_types[0].clone(),
+            ordering_req.to_vec(),
+            ordering_types,
         )),
     })
 }
@@ -1200,7 +1223,7 @@ mod tests {
                 "Invalid or wrong number of arguments passed to aggregate: '{name}'",
             )));
         }
-        create_aggregate_expr(fun, distinct, &coerced_phy_exprs, input_schema, name)
+        create_aggregate_expr(fun, distinct, &coerced_phy_exprs, &[], input_schema, name)
     }
 
     // Returns the coerced exprs for each `input_exprs`.
diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs b/datafusion/physical-expr/src/aggregate/first_last.rs
index a350637c48..f322419a7b 100644
--- a/datafusion/physical-expr/src/aggregate/first_last.rs
+++ b/datafusion/physical-expr/src/aggregate/first_last.rs
@@ -17,9 +17,9 @@
 
 //! Defines the FIRST_VALUE/LAST_VALUE aggregations.
 
-use crate::aggregate::utils::down_cast_any_ref;
+use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
 use crate::expressions::format_state_name;
-use crate::{AggregateExpr, PhysicalExpr};
+use crate::{AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr};
 
 use arrow::array::ArrayRef;
 use arrow::datatypes::{DataType, Field};
@@ -27,6 +27,9 @@ use arrow_array::Array;
 use datafusion_common::{Result, ScalarValue};
 use datafusion_expr::Accumulator;
 
+use arrow::compute;
+use arrow_array::cast::AsArray;
+use datafusion_common::utils::get_row_at_idx;
 use std::any::Any;
 use std::sync::Arc;
 
@@ -34,8 +37,10 @@ use std::sync::Arc;
 #[derive(Debug)]
 pub struct FirstValue {
     name: String,
-    pub data_type: DataType,
+    input_data_type: DataType,
+    order_by_data_types: Vec<DataType>,
     expr: Arc<dyn PhysicalExpr>,
+    ordering_req: LexOrdering,
 }
 
 impl FirstValue {
@@ -43,12 +48,16 @@ impl FirstValue {
     pub fn new(
         expr: Arc<dyn PhysicalExpr>,
         name: impl Into<String>,
-        data_type: DataType,
+        input_data_type: DataType,
+        ordering_req: LexOrdering,
+        order_by_data_types: Vec<DataType>,
     ) -> Self {
         Self {
             name: name.into(),
-            data_type,
+            input_data_type,
+            order_by_data_types,
             expr,
+            ordering_req,
         }
     }
 }
@@ -60,25 +69,46 @@ impl AggregateExpr for FirstValue {
     }
 
     fn field(&self) -> Result<Field> {
-        Ok(Field::new(&self.name, self.data_type.clone(), true))
+        Ok(Field::new(&self.name, self.input_data_type.clone(), true))
     }
 
     fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::new(FirstValueAccumulator::try_new(&self.data_type)?))
+        Ok(Box::new(FirstValueAccumulator::try_new(
+            &self.input_data_type,
+            &self.order_by_data_types,
+        )?))
     }
 
     fn state_fields(&self) -> Result<Vec<Field>> {
-        Ok(vec![Field::new(
+        let mut fields = vec![Field::new(
             format_state_name(&self.name, "first_value"),
-            self.data_type.clone(),
+            self.input_data_type.clone(),
             true,
-        )])
+        )];
+        fields.extend(ordering_fields(
+            &self.ordering_req,
+            &self.order_by_data_types,
+        ));
+        fields.push(Field::new(
+            format_state_name(&self.name, "is_set"),
+            DataType::Boolean,
+            true,
+        ));
+        Ok(fields)
     }
 
     fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
         vec![self.expr.clone()]
     }
 
+    fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
+        if self.ordering_req.is_empty() {
+            None
+        } else {
+            Some(&self.ordering_req)
+        }
+    }
+
     fn name(&self) -> &str {
         &self.name
     }
@@ -92,12 +122,17 @@ impl AggregateExpr for FirstValue {
         Some(Arc::new(LastValue::new(
             self.expr.clone(),
             name,
-            self.data_type.clone(),
+            self.input_data_type.clone(),
+            self.ordering_req.clone(),
+            self.order_by_data_types.clone(),
         )))
     }
 
     fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::new(FirstValueAccumulator::try_new(&self.data_type)?))
+        Ok(Box::new(FirstValueAccumulator::try_new(
+            &self.input_data_type,
+            &self.order_by_data_types,
+        )?))
     }
 }
 
@@ -107,7 +142,8 @@ impl PartialEq<dyn Any> for FirstValue {
             .downcast_ref::<Self>()
             .map(|x| {
                 self.name == x.name
-                    && self.data_type == x.data_type
+                    && self.input_data_type == x.input_data_type
+                    && self.order_by_data_types == x.order_by_data_types
                     && self.expr.eq(&x.expr)
             })
             .unwrap_or(false)
@@ -117,34 +153,44 @@ impl PartialEq<dyn Any> for FirstValue {
 #[derive(Debug)]
 struct FirstValueAccumulator {
     first: ScalarValue,
-    // At the beginning, `is_set` is `false`, this means `first` is not seen yet.
-    // Once we see (`is_set=true`) first value, we do not update `first`.
+    // At the beginning, `is_set` is false, which means `first` is not seen yet.
+    // Once we see the first value, we set the `is_set` flag and do not update `first` anymore.
     is_set: bool,
+    // Stores ordering values, of the aggregator requirement corresponding to first value
+    // of the aggregator. These values are used during merging of multiple partitions.
+    orderings: Vec<ScalarValue>,
 }
 
 impl FirstValueAccumulator {
     /// Creates a new `FirstValueAccumulator` for the given `data_type`.
-    pub fn try_new(data_type: &DataType) -> Result<Self> {
+    pub fn try_new(data_type: &DataType, ordering_dtypes: &[DataType]) -> Result<Self> {
+        let orderings = ordering_dtypes
+            .iter()
+            .map(ScalarValue::try_from)
+            .collect::<Result<Vec<_>>>()?;
         ScalarValue::try_from(data_type).map(|value| Self {
             first: value,
             is_set: false,
+            orderings,
         })
     }
 }
 
 impl Accumulator for FirstValueAccumulator {
     fn state(&self) -> Result<Vec<ScalarValue>> {
-        Ok(vec![
-            self.first.clone(),
-            ScalarValue::Boolean(Some(self.is_set)),
-        ])
+        let mut result = vec![self.first.clone()];
+        result.extend(self.orderings.iter().cloned());
+        result.push(ScalarValue::Boolean(Some(self.is_set)));
+        Ok(result)
     }
 
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
         // If we have seen first value, we shouldn't update it
-        let values = &values[0];
-        if !values.is_empty() && !self.is_set {
-            self.first = ScalarValue::try_from_array(values, 0)?;
+        if !values[0].is_empty() && !self.is_set {
+            let row = get_row_at_idx(values, 0)?;
+            // Update with last value in the array.
+            self.first = row[0].clone();
+            self.orderings = row[1..].to_vec();
             self.is_set = true;
         }
         Ok(())
@@ -152,7 +198,14 @@ impl Accumulator for FirstValueAccumulator {
 
     fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
         // FIRST_VALUE(first1, first2, first3, ...)
-        self.update_batch(states)
+        let last_idx = states.len() - 1;
+        let is_set_flags = &states[last_idx];
+        let flags = is_set_flags.as_boolean();
+        let mut filtered_first_vals = vec![];
+        for state in states.iter().take(last_idx - 1) {
+            filtered_first_vals.push(compute::filter(state, flags)?)
+        }
+        self.update_batch(&filtered_first_vals)
     }
 
     fn evaluate(&self) -> Result<ScalarValue> {
@@ -162,6 +215,8 @@ impl Accumulator for FirstValueAccumulator {
     fn size(&self) -> usize {
         std::mem::size_of_val(self) - std::mem::size_of_val(&self.first)
             + self.first.size()
+            + ScalarValue::size_of_vec(&self.orderings)
+            - std::mem::size_of_val(&self.orderings)
     }
 }
 
@@ -169,8 +224,10 @@ impl Accumulator for FirstValueAccumulator {
 #[derive(Debug)]
 pub struct LastValue {
     name: String,
-    pub data_type: DataType,
+    input_data_type: DataType,
+    order_by_data_types: Vec<DataType>,
     expr: Arc<dyn PhysicalExpr>,
+    ordering_req: LexOrdering,
 }
 
 impl LastValue {
@@ -178,12 +235,16 @@ impl LastValue {
     pub fn new(
         expr: Arc<dyn PhysicalExpr>,
         name: impl Into<String>,
-        data_type: DataType,
+        input_data_type: DataType,
+        ordering_req: LexOrdering,
+        order_by_data_types: Vec<DataType>,
     ) -> Self {
         Self {
             name: name.into(),
-            data_type,
+            input_data_type,
+            order_by_data_types,
             expr,
+            ordering_req,
         }
     }
 }
@@ -195,25 +256,46 @@ impl AggregateExpr for LastValue {
     }
 
     fn field(&self) -> Result<Field> {
-        Ok(Field::new(&self.name, self.data_type.clone(), true))
+        Ok(Field::new(&self.name, self.input_data_type.clone(), true))
     }
 
     fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::new(LastValueAccumulator::try_new(&self.data_type)?))
+        Ok(Box::new(LastValueAccumulator::try_new(
+            &self.input_data_type,
+            &self.order_by_data_types,
+        )?))
     }
 
     fn state_fields(&self) -> Result<Vec<Field>> {
-        Ok(vec![Field::new(
+        let mut fields = vec![Field::new(
             format_state_name(&self.name, "last_value"),
-            self.data_type.clone(),
+            self.input_data_type.clone(),
+            true,
+        )];
+        fields.extend(ordering_fields(
+            &self.ordering_req,
+            &self.order_by_data_types,
+        ));
+        fields.push(Field::new(
+            format_state_name(&self.name, "is_set"),
+            DataType::Boolean,
             true,
-        )])
+        ));
+        Ok(fields)
     }
 
     fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
         vec![self.expr.clone()]
     }
 
+    fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
+        if self.ordering_req.is_empty() {
+            None
+        } else {
+            Some(&self.ordering_req)
+        }
+    }
+
     fn name(&self) -> &str {
         &self.name
     }
@@ -227,12 +309,17 @@ impl AggregateExpr for LastValue {
         Some(Arc::new(FirstValue::new(
             self.expr.clone(),
             name,
-            self.data_type.clone(),
+            self.input_data_type.clone(),
+            self.ordering_req.clone(),
+            self.order_by_data_types.clone(),
         )))
     }
 
     fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::new(LastValueAccumulator::try_new(&self.data_type)?))
+        Ok(Box::new(LastValueAccumulator::try_new(
+            &self.input_data_type,
+            &self.order_by_data_types,
+        )?))
     }
 }
 
@@ -242,7 +329,8 @@ impl PartialEq<dyn Any> for LastValue {
             .downcast_ref::<Self>()
             .map(|x| {
                 self.name == x.name
-                    && self.data_type == x.data_type
+                    && self.input_data_type == x.input_data_type
+                    && self.order_by_data_types == x.order_by_data_types
                     && self.expr.eq(&x.expr)
             })
             .unwrap_or(false)
@@ -252,34 +340,57 @@ impl PartialEq<dyn Any> for LastValue {
 #[derive(Debug)]
 struct LastValueAccumulator {
     last: ScalarValue,
+    // The `is_set` flag keeps track of whether the last value is finalized.
+    // This information is used to discriminate genuine NULLs and NULLS that
+    // occur due to empty partitions.
+    is_set: bool,
+    orderings: Vec<ScalarValue>,
 }
 
 impl LastValueAccumulator {
     /// Creates a new `LastValueAccumulator` for the given `data_type`.
-    pub fn try_new(data_type: &DataType) -> Result<Self> {
+    pub fn try_new(data_type: &DataType, ordering_dtypes: &[DataType]) -> Result<Self> {
+        let orderings = ordering_dtypes
+            .iter()
+            .map(ScalarValue::try_from)
+            .collect::<Result<Vec<_>>>()?;
         Ok(Self {
             last: ScalarValue::try_from(data_type)?,
+            is_set: false,
+            orderings,
         })
     }
 }
 
 impl Accumulator for LastValueAccumulator {
     fn state(&self) -> Result<Vec<ScalarValue>> {
-        Ok(vec![self.last.clone()])
+        let mut result = vec![self.last.clone()];
+        result.extend(self.orderings.clone());
+        result.push(ScalarValue::Boolean(Some(self.is_set)));
+        Ok(result)
     }
 
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
-        let values = &values[0];
-        if !values.is_empty() {
+        if !values[0].is_empty() {
+            let row = get_row_at_idx(values, values[0].len() - 1)?;
             // Update with last value in the array.
-            self.last = ScalarValue::try_from_array(values, values.len() - 1)?;
+            self.last = row[0].clone();
+            self.orderings = row[1..].to_vec();
+            self.is_set = true;
         }
         Ok(())
     }
 
     fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
         // LAST_VALUE(last1, last2, last3, ...)
-        self.update_batch(states)
+        let last_idx = states.len() - 1;
+        let is_set_flags = &states[last_idx];
+        let flags = is_set_flags.as_boolean();
+        let mut filtered_first_vals = vec![];
+        for state in states.iter().take(last_idx - 1) {
+            filtered_first_vals.push(compute::filter(state, flags)?)
+        }
+        self.update_batch(&filtered_first_vals)
     }
 
     fn evaluate(&self) -> Result<ScalarValue> {
@@ -287,7 +398,10 @@ impl Accumulator for LastValueAccumulator {
     }
 
     fn size(&self) -> usize {
-        std::mem::size_of_val(self) - std::mem::size_of_val(&self.last) + self.last.size()
+        std::mem::size_of_val(self) - std::mem::size_of_val(&self.last)
+            + self.last.size()
+            + ScalarValue::size_of_vec(&self.orderings)
+            - std::mem::size_of_val(&self.orderings)
     }
 }
 
@@ -302,8 +416,9 @@ mod tests {
 
     #[test]
     fn test_first_last_value_value() -> Result<()> {
-        let mut first_accumulator = FirstValueAccumulator::try_new(&DataType::Int64)?;
-        let mut last_accumulator = LastValueAccumulator::try_new(&DataType::Int64)?;
+        let mut first_accumulator =
+            FirstValueAccumulator::try_new(&DataType::Int64, &[])?;
+        let mut last_accumulator = LastValueAccumulator::try_new(&DataType::Int64, &[])?;
         // first value in the tuple is start of the range (inclusive),
         // second value in the tuple is end of the range (exclusive)
         let ranges: Vec<(i64, i64)> = vec![(0, 10), (1, 11), (2, 13)];
diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs
index 7d2316c532..9be6d5e1ba 100644
--- a/datafusion/physical-expr/src/aggregate/mod.rs
+++ b/datafusion/physical-expr/src/aggregate/mod.rs
@@ -16,8 +16,8 @@
 // under the License.
 
 use crate::aggregate::row_accumulator::RowAccumulator;
-use crate::expressions::{ArrayAgg, FirstValue, LastValue};
-use crate::PhysicalExpr;
+use crate::expressions::{FirstValue, LastValue, OrderSensitiveArrayAgg};
+use crate::{PhysicalExpr, PhysicalSortExpr};
 use arrow::datatypes::Field;
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::Accumulator;
@@ -31,6 +31,7 @@ pub(crate) mod approx_percentile_cont;
 pub(crate) mod approx_percentile_cont_with_weight;
 pub(crate) mod array_agg;
 pub(crate) mod array_agg_distinct;
+pub(crate) mod array_agg_ordered;
 pub(crate) mod average;
 pub(crate) mod bit_and_or_xor;
 pub(crate) mod bool_and_or;
@@ -85,6 +86,13 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {
     /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
     fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
 
+    /// Order by requirements for the aggregate function
+    /// By default it is `None` (there is no requirement)
+    /// Order-sensitive aggregators, such as `FIRST_VALUE(x ORDER BY y)` should implement this
+    fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
+        None
+    }
+
     /// Human readable name such as `"MIN(c2)"`. The default
     /// implementation returns placeholder text.
     fn name(&self) -> &str {
@@ -133,5 +141,5 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {
 pub fn is_order_sensitive(aggr_expr: &Arc<dyn AggregateExpr>) -> bool {
     aggr_expr.as_any().is::<FirstValue>()
         || aggr_expr.as_any().is::<LastValue>()
-        || aggr_expr.as_any().is::<ArrayAgg>()
+        || aggr_expr.as_any().is::<OrderSensitiveArrayAgg>()
 }
diff --git a/datafusion/physical-expr/src/aggregate/utils.rs b/datafusion/physical-expr/src/aggregate/utils.rs
index aa0834a89c..f6f0086919 100644
--- a/datafusion/physical-expr/src/aggregate/utils.rs
+++ b/datafusion/physical-expr/src/aggregate/utils.rs
@@ -17,10 +17,10 @@
 
 //! Utilities used in aggregates
 
-use crate::AggregateExpr;
+use crate::{AggregateExpr, PhysicalSortExpr};
 use arrow::array::ArrayRef;
 use arrow::datatypes::{MAX_DECIMAL_FOR_EACH_PRECISION, MIN_DECIMAL_FOR_EACH_PRECISION};
-use arrow_schema::DataType;
+use arrow_schema::{DataType, Field};
 use datafusion_common::{DataFusionError, Result, ScalarValue};
 use datafusion_expr::Accumulator;
 use std::any::Any;
@@ -103,3 +103,23 @@ pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
         any
     }
 }
+
+/// Construct corresponding fields for lexicographical ordering requirement expression
+pub(crate) fn ordering_fields(
+    ordering_req: &[PhysicalSortExpr],
+    // Data type of each expression in the ordering requirement
+    data_types: &[DataType],
+) -> Vec<Field> {
+    ordering_req
+        .iter()
+        .zip(data_types.iter())
+        .map(|(expr, dtype)| {
+            Field::new(
+                expr.to_string().as_str(),
+                dtype.clone(),
+                // Multi partitions may be empty hence field should be nullable.
+                true,
+            )
+        })
+        .collect()
+}
diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs
index 0ca132aefd..c660cfadcc 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -46,6 +46,7 @@ pub use crate::aggregate::approx_percentile_cont::ApproxPercentileCont;
 pub use crate::aggregate::approx_percentile_cont_with_weight::ApproxPercentileContWithWeight;
 pub use crate::aggregate::array_agg::ArrayAgg;
 pub use crate::aggregate::array_agg_distinct::DistinctArrayAgg;
+pub use crate::aggregate::array_agg_ordered::OrderSensitiveArrayAgg;
 pub use crate::aggregate::average::{Avg, AvgAccumulator};
 pub use crate::aggregate::bit_and_or_xor::{BitAnd, BitOr, BitXor, DistinctBitXor};
 pub use crate::aggregate::bool_and_or::{BoolAnd, BoolOr};
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index 9b05dea712..7fddd31a6b 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1107,6 +1107,7 @@ message PhysicalAggregateExprNode {
     string user_defined_aggr_function = 4;
   }
   repeated PhysicalExprNode expr = 2;
+  repeated PhysicalSortExprNode ordering_req = 5;
   bool distinct = 3;
 }
 
diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs
index 890fe7221a..4c1bab5e39 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -13080,6 +13080,9 @@ impl serde::Serialize for PhysicalAggregateExprNode {
         if !self.expr.is_empty() {
             len += 1;
         }
+        if !self.ordering_req.is_empty() {
+            len += 1;
+        }
         if self.distinct {
             len += 1;
         }
@@ -13090,6 +13093,9 @@ impl serde::Serialize for PhysicalAggregateExprNode {
         if !self.expr.is_empty() {
             struct_ser.serialize_field("expr", &self.expr)?;
         }
+        if !self.ordering_req.is_empty() {
+            struct_ser.serialize_field("orderingReq", &self.ordering_req)?;
+        }
         if self.distinct {
             struct_ser.serialize_field("distinct", &self.distinct)?;
         }
@@ -13116,6 +13122,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalAggregateExprNode {
     {
         const FIELDS: &[&str] = &[
             "expr",
+            "ordering_req",
+            "orderingReq",
             "distinct",
             "aggr_function",
             "aggrFunction",
@@ -13126,6 +13134,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalAggregateExprNode {
         #[allow(clippy::enum_variant_names)]
         enum GeneratedField {
             Expr,
+            OrderingReq,
             Distinct,
             AggrFunction,
             UserDefinedAggrFunction,
@@ -13151,6 +13160,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalAggregateExprNode {
                     {
                         match value {
                             "expr" => Ok(GeneratedField::Expr),
+                            "orderingReq" | "ordering_req" => Ok(GeneratedField::OrderingReq),
                             "distinct" => Ok(GeneratedField::Distinct),
                             "aggrFunction" | "aggr_function" => Ok(GeneratedField::AggrFunction),
                             "userDefinedAggrFunction" | "user_defined_aggr_function" => Ok(GeneratedField::UserDefinedAggrFunction),
@@ -13174,6 +13184,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalAggregateExprNode {
                     V: serde::de::MapAccess<'de>,
             {
                 let mut expr__ = None;
+                let mut ordering_req__ = None;
                 let mut distinct__ = None;
                 let mut aggregate_function__ = None;
                 while let Some(k) = map.next_key()? {
@@ -13184,6 +13195,12 @@ impl<'de> serde::Deserialize<'de> for PhysicalAggregateExprNode {
                             }
                             expr__ = Some(map.next_value()?);
                         }
+                        GeneratedField::OrderingReq => {
+                            if ordering_req__.is_some() {
+                                return Err(serde::de::Error::duplicate_field("orderingReq"));
+                            }
+                            ordering_req__ = Some(map.next_value()?);
+                        }
                         GeneratedField::Distinct => {
                             if distinct__.is_some() {
                                 return Err(serde::de::Error::duplicate_field("distinct"));
@@ -13206,6 +13223,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalAggregateExprNode {
                 }
                 Ok(PhysicalAggregateExprNode {
                     expr: expr__.unwrap_or_default(),
+                    ordering_req: ordering_req__.unwrap_or_default(),
                     distinct: distinct__.unwrap_or_default(),
                     aggregate_function: aggregate_function__,
                 })
diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs
index b1ae0058dc..8dfc209477 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1513,6 +1513,8 @@ pub struct PhysicalScalarUdfNode {
 pub struct PhysicalAggregateExprNode {
     #[prost(message, repeated, tag = "2")]
     pub expr: ::prost::alloc::vec::Vec<PhysicalExprNode>,
+    #[prost(message, repeated, tag = "5")]
+    pub ordering_req: ::prost::alloc::vec::Vec<PhysicalSortExprNode>,
     #[prost(bool, tag = "3")]
     pub distinct: bool,
     #[prost(oneof = "physical_aggregate_expr_node::AggregateFunction", tags = "1, 4")]
diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs
index 904cf08f7b..1daa1c2e4b 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -441,7 +441,8 @@ impl AsExecutionPlan for PhysicalPlanNode {
                             ExprType::AggregateExpr(agg_node) => {
                                 let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> = agg_node.expr.iter()
                                     .map(|e| parse_physical_expr(e, registry, &physical_schema).unwrap()).collect();
-
+                                let ordering_req: Vec<PhysicalSortExpr> = agg_node.ordering_req.iter()
+                                    .map(|e| parse_physical_sort_expr(e, registry, &physical_schema).unwrap()).collect();
                                 agg_node.aggregate_function.as_ref().map(|func| {
                                     match func {
                                         AggregateFunction::AggrFunction(i) => {
@@ -458,6 +459,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
                                                 &aggr_function.into(),
                                                 agg_node.distinct,
                                                 input_phy_expr.as_slice(),
+                                                &ordering_req,
                                                 &physical_schema,
                                                 name.to_string(),
                                             )
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs
index 0910ddaad0..aaf3569d16 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -67,6 +67,13 @@ impl TryFrom<Arc<dyn AggregateExpr>> for protobuf::PhysicalExprNode {
             .map(|e| e.clone().try_into())
             .collect::<Result<Vec<_>>>()?;
 
+        let ordering_req: Vec<protobuf::PhysicalSortExprNode> = a
+            .order_bys()
+            .unwrap_or(&[])
+            .iter()
+            .map(|e| e.clone().try_into())
+            .collect::<Result<Vec<_>>>()?;
+
         let mut distinct = false;
         let aggr_function = if a.as_any().downcast_ref::<Avg>().is_some() {
             Ok(AggregateFunction::Avg.into())
@@ -151,6 +158,10 @@ impl TryFrom<Arc<dyn AggregateExpr>> for protobuf::PhysicalExprNode {
             .is_some()
         {
             Ok(AggregateFunction::ApproxMedian.into())
+        } else if a.as_any().is::<expressions::FirstValue>() {
+            Ok(AggregateFunction::FirstValueAgg.into())
+        } else if a.as_any().is::<expressions::LastValue>() {
+            Ok(AggregateFunction::LastValueAgg.into())
         } else {
             if let Some(a) = a.as_any().downcast_ref::<AggregateFunctionExpr>() {
                 return Ok(protobuf::PhysicalExprNode {
@@ -158,6 +169,7 @@ impl TryFrom<Arc<dyn AggregateExpr>> for protobuf::PhysicalExprNode {
                         protobuf::PhysicalAggregateExprNode {
                             aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(a.fun().name.clone())),
                             expr: expressions,
+                            ordering_req,
                             distinct,
                         },
                     )),
@@ -178,6 +190,7 @@ impl TryFrom<Arc<dyn AggregateExpr>> for protobuf::PhysicalExprNode {
                         ),
                     ),
                     expr: expressions,
+                    ordering_req,
                     distinct,
                 },
             )),