You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/04/06 10:07:38 UTC

[arrow-datafusion] branch main updated: [MINOR]: Refactor to increase readability (#5874)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new e84ef3742a [MINOR]: Refactor to increase readability (#5874)
e84ef3742a is described below

commit e84ef3742ae76dd0f5fdf98f7826afcefa59200e
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Thu Apr 6 13:07:31 2023 +0300

    [MINOR]: Refactor to increase readability (#5874)
    
    * Change return api for util codes
    
    * Refactor
    
    * make evaluate_partition points function
    
    * rename function
    
    * Simplify sort removal logic for windowing
    
    * Rename function
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
 datafusion/common/src/lib.rs                       |  10 --
 datafusion/common/src/utils.rs                     |  55 +++++++-
 .../src/physical_optimizer/sort_enforcement.rs     | 138 ++++++++++-----------
 .../windows/bounded_window_agg_exec.rs             |  28 +----
 .../src/physical_plan/windows/window_agg_exec.rs   |  27 +---
 datafusion/physical-expr/src/window/built_in.rs    |   6 +-
 datafusion/physical-expr/src/window/window_expr.rs |  24 +---
 7 files changed, 130 insertions(+), 158 deletions(-)

diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs
index 2b227fd1db..1b59f43a67 100644
--- a/datafusion/common/src/lib.rs
+++ b/datafusion/common/src/lib.rs
@@ -32,7 +32,6 @@ pub mod test_util;
 pub mod tree_node;
 pub mod utils;
 
-use arrow::compute::SortOptions;
 pub use column::Column;
 pub use dfschema::{DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema};
 pub use error::{
@@ -68,12 +67,3 @@ macro_rules! downcast_value {
         })?
     }};
 }
-
-/// Computes the "reverse" of given `SortOptions`.
-// TODO: If/when arrow supports `!` for `SortOptions`, we can remove this.
-pub fn reverse_sort_options(options: SortOptions) -> SortOptions {
-    SortOptions {
-        descending: !options.descending,
-        nulls_first: !options.nulls_first,
-    }
-}
diff --git a/datafusion/common/src/utils.rs b/datafusion/common/src/utils.rs
index 6872653fb4..3451152686 100644
--- a/datafusion/common/src/utils.rs
+++ b/datafusion/common/src/utils.rs
@@ -19,13 +19,14 @@
 
 use crate::{DataFusionError, Result, ScalarValue};
 use arrow::array::ArrayRef;
-use arrow::compute::SortOptions;
+use arrow::compute::{lexicographical_partition_ranges, SortColumn, SortOptions};
 use sqlparser::ast::Ident;
 use sqlparser::dialect::GenericDialect;
 use sqlparser::parser::{Parser, ParserError};
 use sqlparser::tokenizer::{Token, TokenWithLocation};
 use std::borrow::Cow;
 use std::cmp::Ordering;
+use std::ops::Range;
 
 /// Given column vectors, returns row at `idx`.
 pub fn get_row_at_idx(columns: &[ArrayRef], idx: usize) -> Result<Vec<ScalarValue>> {
@@ -163,6 +164,23 @@ where
     Ok(low)
 }
 
+/// This function finds the partition points according to `partition_columns`.
+/// If there are no sort columns, then the result will be a single element
+/// vector containing one partition range spanning all data.
+pub fn evaluate_partition_ranges(
+    num_rows: usize,
+    partition_columns: &[SortColumn],
+) -> Result<Vec<Range<usize>>> {
+    Ok(if partition_columns.is_empty() {
+        vec![Range {
+            start: 0,
+            end: num_rows,
+        }]
+    } else {
+        lexicographical_partition_ranges(partition_columns)?.collect()
+    })
+}
+
 /// Wraps identifier string in double quotes, escaping any double quotes in
 /// the identifier by replacing it with two double quotes
 ///
@@ -256,6 +274,7 @@ pub(crate) fn parse_identifiers_normalized(s: &str) -> Vec<String> {
 #[cfg(test)]
 mod tests {
     use arrow::array::Float64Array;
+    use arrow_array::Array;
     use std::sync::Arc;
 
     use crate::from_slice::FromSlice;
@@ -426,6 +445,40 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn test_evaluate_partition_ranges() -> Result<()> {
+        let arrays: Vec<ArrayRef> = vec![
+            Arc::new(Float64Array::from_slice([1.0, 1.0, 1.0, 2.0, 2.0, 2.0])),
+            Arc::new(Float64Array::from_slice([4.0, 4.0, 3.0, 2.0, 1.0, 1.0])),
+        ];
+        let n_row = arrays[0].len();
+        let options: Vec<SortOptions> = vec![
+            SortOptions {
+                descending: false,
+                nulls_first: false,
+            },
+            SortOptions {
+                descending: true,
+                nulls_first: false,
+            },
+        ];
+        let sort_columns = arrays
+            .into_iter()
+            .zip(options)
+            .map(|(values, options)| SortColumn {
+                values,
+                options: Some(options),
+            })
+            .collect::<Vec<_>>();
+        let ranges = evaluate_partition_ranges(n_row, &sort_columns)?;
+        assert_eq!(ranges.len(), 4);
+        assert_eq!(ranges[0], Range { start: 0, end: 2 });
+        assert_eq!(ranges[1], Range { start: 2, end: 3 });
+        assert_eq!(ranges[2], Range { start: 3, end: 4 });
+        assert_eq!(ranges[3], Range { start: 4, end: 6 });
+        Ok(())
+    }
+
     #[test]
     fn test_parse_identifiers() -> Result<()> {
         let s = "CATALOG.\"F(o)o. \"\"bar\".table";
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 7428c339dc..4817c01e5c 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -48,7 +48,7 @@ use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
 use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
 use arrow::datatypes::SchemaRef;
 use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
-use datafusion_common::{reverse_sort_options, DataFusionError};
+use datafusion_common::DataFusionError;
 use datafusion_physical_expr::utils::{
     make_sort_exprs_from_requirements, ordering_satisfy,
     ordering_satisfy_requirement_concrete,
@@ -587,7 +587,7 @@ fn analyze_window_sort_removal(
         .map(|elem| elem.len())
         .unwrap_or(0);
 
-    let mut first_should_reverse = None;
+    let mut needs_reverse = None;
     for sort_any in sort_tree.get_leaves() {
         let sort_output_ordering = sort_any.output_ordering();
         // Variable `sort_any` will either be a `SortExec` or a
@@ -598,32 +598,27 @@ fn analyze_window_sort_removal(
         let sort_output_ordering = sort_output_ordering.ok_or_else(|| {
             DataFusionError::Plan("A SortExec should have output ordering".to_string())
         })?;
-        // It is enough to check whether first n_req element of the sort output satisfies window_exec requirement.
-        // Because length of window_exec requirement is n_req.
+        // It is enough to check whether the first "n_req" elements of the sort
+        // output satisfy window_exec's requirement as it is only "n_req" long.
         let required_ordering = &sort_output_ordering[0..n_req];
         if let Some(physical_ordering) = physical_ordering {
-            let (can_skip_sorting, should_reverse) = can_skip_sort(
+            if let Some(should_reverse) = can_skip_sort(
                 window_expr[0].partition_by(),
                 required_ordering,
                 &sort_input.schema(),
                 physical_ordering,
-            )?;
-            if !can_skip_sorting {
-                return Ok(None);
-            } else if let Some(first_should_reverse) = first_should_reverse {
-                if first_should_reverse != should_reverse {
-                    return Ok(None);
+            )? {
+                if should_reverse == *needs_reverse.get_or_insert(should_reverse) {
+                    continue;
                 }
-            } else {
-                first_should_reverse = Some(should_reverse);
             }
-        } else {
-            // If there is no physical ordering, there is no way to remove a
-            // sort, so immediately return.
-            return Ok(None);
         }
+        // If there is no physical ordering, or we can not skip the sort, or
+        // window reversal requirements are not uniform; then there is no
+        // opportunity for a sort removal -- we immediately return.
+        return Ok(None);
     }
-    let new_window_expr = if first_should_reverse.unwrap() {
+    let new_window_expr = if needs_reverse.unwrap() {
         window_expr
             .iter()
             .map(|e| e.get_reverse_expr())
@@ -784,39 +779,40 @@ fn get_sort_exprs(sort_any: &Arc<dyn ExecutionPlan>) -> Result<&[PhysicalSortExp
 #[derive(Debug)]
 /// This structure stores extra column information required to remove unnecessary sorts.
 pub struct ColumnInfo {
-    is_aligned: bool,
     reverse: bool,
     is_partition: bool,
 }
 
-/// Compares physical ordering and required ordering of all `PhysicalSortExpr`s and returns a tuple.
-/// The first element indicates whether these `PhysicalSortExpr`s can be removed from the physical plan.
-/// The second element is a flag indicating whether we should reverse the sort direction in order to
-/// remove physical sort expressions from the plan.
+/// Compares physical ordering and required ordering of all `PhysicalSortExpr`s
+/// to decide whether a `SortExec` before a `WindowAggExec` can be removed.
+/// A `None` return value indicates that we can remove the sort in question.
+/// A `Some(bool)` value indicates otherwise, and signals whether we need to
+/// reverse the ordering in order to remove the sort in question.
 pub fn can_skip_sort(
     partition_keys: &[Arc<dyn PhysicalExpr>],
     required: &[PhysicalSortExpr],
     input_schema: &SchemaRef,
     physical_ordering: &[PhysicalSortExpr],
-) -> Result<(bool, bool)> {
+) -> Result<Option<bool>> {
     if required.len() > physical_ordering.len() {
-        return Ok((false, false));
+        return Ok(None);
     }
     let mut col_infos = vec![];
     for (sort_expr, physical_expr) in zip(required, physical_ordering) {
         let column = sort_expr.expr.clone();
         let is_partition = partition_keys.iter().any(|e| e.eq(&column));
-        let (is_aligned, reverse) =
-            check_alignment(input_schema, physical_expr, sort_expr);
-        col_infos.push(ColumnInfo {
-            is_aligned,
-            reverse,
-            is_partition,
-        });
+        if let Some(reverse) = check_alignment(input_schema, physical_expr, sort_expr)? {
+            col_infos.push(ColumnInfo {
+                reverse,
+                is_partition,
+            });
+        } else {
+            return Ok(None);
+        }
     }
     let partition_by_sections = col_infos
         .iter()
-        .filter(|elem| elem.is_partition)
+        .filter(|c| c.is_partition)
         .collect::<Vec<_>>();
     let can_skip_partition_bys = if partition_by_sections.is_empty() {
         true
@@ -824,7 +820,7 @@ pub fn can_skip_sort(
         let first_reverse = partition_by_sections[0].reverse;
         let can_skip_partition_bys = partition_by_sections
             .iter()
-            .all(|c| c.is_aligned && c.reverse == first_reverse);
+            .all(|c| c.reverse == first_reverse);
         can_skip_partition_bys
     };
     let order_by_sections = col_infos
@@ -835,38 +831,36 @@ pub fn can_skip_sort(
         (true, false)
     } else {
         let first_reverse = order_by_sections[0].reverse;
-        let can_skip_order_bys = order_by_sections
-            .iter()
-            .all(|c| c.is_aligned && c.reverse == first_reverse);
+        let can_skip_order_bys =
+            order_by_sections.iter().all(|c| c.reverse == first_reverse);
         (can_skip_order_bys, first_reverse)
     };
     let can_skip = can_skip_order_bys && can_skip_partition_bys;
-    Ok((can_skip, should_reverse_order_bys))
+    Ok(can_skip.then_some(should_reverse_order_bys))
 }
 
-/// Compares `physical_ordering` and `required` ordering, returns a tuple
-/// indicating (1) whether this column requires sorting, and (2) whether we
-/// should reverse the window expression in order to avoid sorting.
+/// Compares `physical_ordering` and `required` ordering, decides whether
+/// alignments match. A `None` return value indicates that current column is
+/// not aligned. A `Some(bool)` value indicates otherwise, and signals whether
+/// we should reverse the window expression in order to avoid sorting.
 fn check_alignment(
     input_schema: &SchemaRef,
     physical_ordering: &PhysicalSortExpr,
     required: &PhysicalSortExpr,
-) -> (bool, bool) {
-    if required.expr.eq(&physical_ordering.expr) {
-        let nullable = required.expr.nullable(input_schema).unwrap();
+) -> Result<Option<bool>> {
+    Ok(if required.expr.eq(&physical_ordering.expr) {
         let physical_opts = physical_ordering.options;
         let required_opts = required.options;
-        let is_reversed = if nullable {
-            physical_opts == reverse_sort_options(required_opts)
+        if required.expr.nullable(input_schema)? {
+            let reverse = physical_opts == !required_opts;
+            (reverse || physical_opts == required_opts).then_some(reverse)
         } else {
             // If the column is not nullable, NULLS FIRST/LAST is not important.
-            physical_opts.descending != required_opts.descending
-        };
-        let can_skip = !nullable || is_reversed || (physical_opts == required_opts);
-        (can_skip, is_reversed)
+            Some(physical_opts.descending != required_opts.descending)
+        }
     } else {
-        (false, false)
-    }
+        None
+    })
 }
 
 #[cfg(test)]
@@ -925,17 +919,17 @@ mod tests {
     async fn test_is_column_aligned_nullable() -> Result<()> {
         let schema = create_test_schema()?;
         let params = vec![
-            ((true, true), (false, false), (true, true)),
-            ((true, true), (false, true), (false, false)),
-            ((true, true), (true, false), (false, false)),
-            ((true, false), (false, true), (true, true)),
-            ((true, false), (false, false), (false, false)),
-            ((true, false), (true, true), (false, false)),
+            ((true, true), (false, false), Some(true)),
+            ((true, true), (false, true), None),
+            ((true, true), (true, false), None),
+            ((true, false), (false, true), Some(true)),
+            ((true, false), (false, false), None),
+            ((true, false), (true, true), None),
         ];
         for (
             (physical_desc, physical_nulls_first),
             (req_desc, req_nulls_first),
-            (is_aligned_expected, reverse_expected),
+            expected,
         ) in params
         {
             let physical_ordering = PhysicalSortExpr {
@@ -952,10 +946,8 @@ mod tests {
                     nulls_first: req_nulls_first,
                 },
             };
-            let (is_aligned, reverse) =
-                check_alignment(&schema, &physical_ordering, &required_ordering);
-            assert_eq!(is_aligned, is_aligned_expected);
-            assert_eq!(reverse, reverse_expected);
+            let res = check_alignment(&schema, &physical_ordering, &required_ordering)?;
+            assert_eq!(res, expected);
         }
 
         Ok(())
@@ -966,17 +958,17 @@ mod tests {
         let schema = create_test_schema()?;
 
         let params = vec![
-            ((true, true), (false, false), (true, true)),
-            ((true, true), (false, true), (true, true)),
-            ((true, true), (true, false), (true, false)),
-            ((true, false), (false, true), (true, true)),
-            ((true, false), (false, false), (true, true)),
-            ((true, false), (true, true), (true, false)),
+            ((true, true), (false, false), Some(true)),
+            ((true, true), (false, true), Some(true)),
+            ((true, true), (true, false), Some(false)),
+            ((true, false), (false, true), Some(true)),
+            ((true, false), (false, false), Some(true)),
+            ((true, false), (true, true), Some(false)),
         ];
         for (
             (physical_desc, physical_nulls_first),
             (req_desc, req_nulls_first),
-            (is_aligned_expected, reverse_expected),
+            expected,
         ) in params
         {
             let physical_ordering = PhysicalSortExpr {
@@ -993,10 +985,8 @@ mod tests {
                     nulls_first: req_nulls_first,
                 },
             };
-            let (is_aligned, reverse) =
-                check_alignment(&schema, &physical_ordering, &required_ordering);
-            assert_eq!(is_aligned, is_aligned_expected);
-            assert_eq!(reverse, reverse_expected);
+            let res = check_alignment(&schema, &physical_ordering, &required_ordering)?;
+            assert_eq!(res, expected);
         }
 
         Ok(())
diff --git a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
index 4b01d3b4ed..443327e516 100644
--- a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
@@ -30,12 +30,9 @@ use crate::physical_plan::{
     ColumnStatistics, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
     RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr,
 };
-use arrow::array::Array;
-use arrow::compute::{
-    concat, concat_batches, lexicographical_partition_ranges, SortColumn,
-};
 use arrow::{
-    array::ArrayRef,
+    array::{Array, ArrayRef},
+    compute::{concat, concat_batches, SortColumn},
     datatypes::{Schema, SchemaRef},
     record_batch::RecordBatch,
 };
@@ -45,12 +42,12 @@ use futures::{ready, StreamExt};
 use std::any::Any;
 use std::cmp::min;
 use std::collections::HashMap;
-use std::ops::Range;
 use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
 use crate::physical_plan::windows::calc_requirements;
+use datafusion_common::utils::evaluate_partition_ranges;
 use datafusion_physical_expr::window::{
     PartitionBatchState, PartitionBatches, PartitionKey, PartitionWindowAggStates,
     WindowAggState, WindowState,
@@ -366,7 +363,7 @@ impl PartitionByHandler for SortedPartitionByBoundedWindowStream {
         let num_rows = record_batch.num_rows();
         if num_rows > 0 {
             let partition_points =
-                self.evaluate_partition_points(num_rows, &partition_columns)?;
+                evaluate_partition_ranges(num_rows, &partition_columns)?;
             for partition_range in partition_points {
                 let partition_row = partition_columns
                     .iter()
@@ -628,23 +625,6 @@ impl SortedPartitionByBoundedWindowStream {
             .map(|e| e.evaluate_to_sort_column(batch))
             .collect::<Result<Vec<_>>>()
     }
-
-    /// evaluate the partition points given the sort columns; if the sort columns are
-    /// empty then the result will be a single element vec of the whole column rows.
-    fn evaluate_partition_points(
-        &self,
-        num_rows: usize,
-        partition_columns: &[SortColumn],
-    ) -> Result<Vec<Range<usize>>> {
-        Ok(if partition_columns.is_empty() {
-            vec![Range {
-                start: 0,
-                end: num_rows,
-            }]
-        } else {
-            lexicographical_partition_ranges(partition_columns)?.collect()
-        })
-    }
 }
 
 impl RecordBatchStream for SortedPartitionByBoundedWindowStream {
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index 75598f1d52..9e738cafd9 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -30,21 +30,19 @@ use crate::physical_plan::{
     ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
     SendableRecordBatchStream, Statistics, WindowExpr,
 };
-use arrow::compute::{
-    concat, concat_batches, lexicographical_partition_ranges, SortColumn,
-};
+use arrow::compute::{concat, concat_batches};
 use arrow::error::ArrowError;
 use arrow::{
     array::ArrayRef,
     datatypes::{Schema, SchemaRef},
     record_batch::RecordBatch,
 };
+use datafusion_common::utils::evaluate_partition_ranges;
 use datafusion_common::DataFusionError;
 use datafusion_physical_expr::PhysicalSortRequirement;
 use futures::stream::Stream;
 use futures::{ready, StreamExt};
 use std::any::Any;
-use std::ops::Range;
 use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll};
@@ -336,7 +334,7 @@ impl WindowAggStream {
             .map(|elem| elem.evaluate_to_sort_column(&batch))
             .collect::<Result<Vec<_>>>()?;
         let partition_points =
-            self.evaluate_partition_points(batch.num_rows(), &partition_by_sort_keys)?;
+            evaluate_partition_ranges(batch.num_rows(), &partition_by_sort_keys)?;
 
         let mut partition_results = vec![];
         // Calculate window cols
@@ -362,25 +360,6 @@ impl WindowAggStream {
         batch_columns.extend_from_slice(&columns);
         Ok(RecordBatch::try_new(self.schema.clone(), batch_columns)?)
     }
-
-    /// Evaluates the partition points given the sort columns. If the sort columns are
-    /// empty, then the result will be a single element vector spanning the entire batch.
-    fn evaluate_partition_points(
-        &self,
-        num_rows: usize,
-        partition_columns: &[SortColumn],
-    ) -> Result<Vec<Range<usize>>> {
-        Ok(if partition_columns.is_empty() {
-            vec![Range {
-                start: 0,
-                end: num_rows,
-            }]
-        } else {
-            lexicographical_partition_ranges(partition_columns)
-                .map_err(DataFusionError::ArrowError)?
-                .collect::<Vec<_>>()
-        })
-    }
 }
 
 impl Stream for WindowAggStream {
diff --git a/datafusion/physical-expr/src/window/built_in.rs b/datafusion/physical-expr/src/window/built_in.rs
index 329eac3334..1576e8e9c4 100644
--- a/datafusion/physical-expr/src/window/built_in.rs
+++ b/datafusion/physical-expr/src/window/built_in.rs
@@ -35,6 +35,7 @@ use arrow::array::{new_empty_array, Array, ArrayRef};
 use arrow::compute::SortOptions;
 use arrow::datatypes::Field;
 use arrow::record_batch::RecordBatch;
+use datafusion_common::utils::evaluate_partition_ranges;
 use datafusion_common::{Result, ScalarValue};
 use datafusion_expr::WindowFrame;
 
@@ -122,8 +123,7 @@ impl WindowExpr for BuiltInWindowExpr {
             ScalarValue::iter_to_array(row_wise_results.into_iter())
         } else if evaluator.include_rank() {
             let columns = self.sort_columns(batch)?;
-            let sort_partition_points =
-                self.evaluate_partition_points(num_rows, &columns)?;
+            let sort_partition_points = evaluate_partition_ranges(num_rows, &columns)?;
             evaluator.evaluate_with_rank(num_rows, &sort_partition_points)
         } else {
             let (values, _) = self.get_values_orderbys(batch)?;
@@ -168,7 +168,7 @@ impl WindowExpr for BuiltInWindowExpr {
             let num_rows = record_batch.num_rows();
             let sort_partition_points = if evaluator.include_rank() {
                 let columns = self.sort_columns(record_batch)?;
-                self.evaluate_partition_points(num_rows, &columns)?
+                evaluate_partition_ranges(num_rows, &columns)?
             } else {
                 vec![]
             };
diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs
index 7568fa3b2b..45e6f46ef0 100644
--- a/datafusion/physical-expr/src/window/window_expr.rs
+++ b/datafusion/physical-expr/src/window/window_expr.rs
@@ -19,13 +19,12 @@ use crate::window::partition_evaluator::PartitionEvaluator;
 use crate::window::window_frame_state::WindowFrameContext;
 use crate::{PhysicalExpr, PhysicalSortExpr};
 use arrow::array::{new_empty_array, Array, ArrayRef};
-use arrow::compute::kernels::partition::lexicographical_partition_ranges;
 use arrow::compute::kernels::sort::SortColumn;
 use arrow::compute::{concat, SortOptions};
 use arrow::datatypes::Field;
 use arrow::record_batch::RecordBatch;
 use arrow_schema::DataType;
-use datafusion_common::{reverse_sort_options, DataFusionError, Result, ScalarValue};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
 use datafusion_expr::{Accumulator, WindowFrame};
 use indexmap::IndexMap;
 use std::any::Any;
@@ -80,25 +79,6 @@ pub trait WindowExpr: Send + Sync + Debug {
         )))
     }
 
-    /// Evaluate the partition points given the sort columns; if the sort columns are
-    /// empty then the result will be a single element `Vec` of the whole column rows.
-    fn evaluate_partition_points(
-        &self,
-        num_rows: usize,
-        partition_columns: &[SortColumn],
-    ) -> Result<Vec<Range<usize>>> {
-        if partition_columns.is_empty() {
-            Ok(vec![Range {
-                start: 0,
-                end: num_rows,
-            }])
-        } else {
-            Ok(lexicographical_partition_ranges(partition_columns)
-                .map_err(DataFusionError::ArrowError)?
-                .collect::<Vec<_>>())
-        }
-    }
-
     /// Expressions that's from the window function's partition by clause, empty if absent
     fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>];
 
@@ -281,7 +261,7 @@ pub fn reverse_order_bys(order_bys: &[PhysicalSortExpr]) -> Vec<PhysicalSortExpr
         .iter()
         .map(|e| PhysicalSortExpr {
             expr: e.expr.clone(),
-            options: reverse_sort_options(e.options),
+            options: !e.options,
         })
         .collect()
 }