You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/04/06 10:07:38 UTC
[arrow-datafusion] branch main updated: [MINOR]: Refactor to increase readability (#5874)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new e84ef3742a [MINOR]: Refactor to increase readability (#5874)
e84ef3742a is described below
commit e84ef3742ae76dd0f5fdf98f7826afcefa59200e
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Thu Apr 6 13:07:31 2023 +0300
[MINOR]: Refactor to increase readability (#5874)
* Change return api for util codes
* Refactor
* make evaluate_partition points function
* rename function
* Simplify sort removal logic for windowing
* Rename function
---------
Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
datafusion/common/src/lib.rs | 10 --
datafusion/common/src/utils.rs | 55 +++++++-
.../src/physical_optimizer/sort_enforcement.rs | 138 ++++++++++-----------
.../windows/bounded_window_agg_exec.rs | 28 +----
.../src/physical_plan/windows/window_agg_exec.rs | 27 +---
datafusion/physical-expr/src/window/built_in.rs | 6 +-
datafusion/physical-expr/src/window/window_expr.rs | 24 +---
7 files changed, 130 insertions(+), 158 deletions(-)
diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs
index 2b227fd1db..1b59f43a67 100644
--- a/datafusion/common/src/lib.rs
+++ b/datafusion/common/src/lib.rs
@@ -32,7 +32,6 @@ pub mod test_util;
pub mod tree_node;
pub mod utils;
-use arrow::compute::SortOptions;
pub use column::Column;
pub use dfschema::{DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema};
pub use error::{
@@ -68,12 +67,3 @@ macro_rules! downcast_value {
})?
}};
}
-
-/// Computes the "reverse" of given `SortOptions`.
-// TODO: If/when arrow supports `!` for `SortOptions`, we can remove this.
-pub fn reverse_sort_options(options: SortOptions) -> SortOptions {
- SortOptions {
- descending: !options.descending,
- nulls_first: !options.nulls_first,
- }
-}
diff --git a/datafusion/common/src/utils.rs b/datafusion/common/src/utils.rs
index 6872653fb4..3451152686 100644
--- a/datafusion/common/src/utils.rs
+++ b/datafusion/common/src/utils.rs
@@ -19,13 +19,14 @@
use crate::{DataFusionError, Result, ScalarValue};
use arrow::array::ArrayRef;
-use arrow::compute::SortOptions;
+use arrow::compute::{lexicographical_partition_ranges, SortColumn, SortOptions};
use sqlparser::ast::Ident;
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::{Parser, ParserError};
use sqlparser::tokenizer::{Token, TokenWithLocation};
use std::borrow::Cow;
use std::cmp::Ordering;
+use std::ops::Range;
/// Given column vectors, returns row at `idx`.
pub fn get_row_at_idx(columns: &[ArrayRef], idx: usize) -> Result<Vec<ScalarValue>> {
@@ -163,6 +164,23 @@ where
Ok(low)
}
+/// This function finds the partition points according to `partition_columns`.
+/// If there are no sort columns, then the result will be a single element
+/// vector containing one partition range spanning all data.
+pub fn evaluate_partition_ranges(
+ num_rows: usize,
+ partition_columns: &[SortColumn],
+) -> Result<Vec<Range<usize>>> {
+ Ok(if partition_columns.is_empty() {
+ vec![Range {
+ start: 0,
+ end: num_rows,
+ }]
+ } else {
+ lexicographical_partition_ranges(partition_columns)?.collect()
+ })
+}
+
/// Wraps identifier string in double quotes, escaping any double quotes in
/// the identifier by replacing it with two double quotes
///
@@ -256,6 +274,7 @@ pub(crate) fn parse_identifiers_normalized(s: &str) -> Vec<String> {
#[cfg(test)]
mod tests {
use arrow::array::Float64Array;
+ use arrow_array::Array;
use std::sync::Arc;
use crate::from_slice::FromSlice;
@@ -426,6 +445,40 @@ mod tests {
Ok(())
}
+ #[test]
+ fn test_evaluate_partition_ranges() -> Result<()> {
+ let arrays: Vec<ArrayRef> = vec![
+ Arc::new(Float64Array::from_slice([1.0, 1.0, 1.0, 2.0, 2.0, 2.0])),
+ Arc::new(Float64Array::from_slice([4.0, 4.0, 3.0, 2.0, 1.0, 1.0])),
+ ];
+ let n_row = arrays[0].len();
+ let options: Vec<SortOptions> = vec![
+ SortOptions {
+ descending: false,
+ nulls_first: false,
+ },
+ SortOptions {
+ descending: true,
+ nulls_first: false,
+ },
+ ];
+ let sort_columns = arrays
+ .into_iter()
+ .zip(options)
+ .map(|(values, options)| SortColumn {
+ values,
+ options: Some(options),
+ })
+ .collect::<Vec<_>>();
+ let ranges = evaluate_partition_ranges(n_row, &sort_columns)?;
+ assert_eq!(ranges.len(), 4);
+ assert_eq!(ranges[0], Range { start: 0, end: 2 });
+ assert_eq!(ranges[1], Range { start: 2, end: 3 });
+ assert_eq!(ranges[2], Range { start: 3, end: 4 });
+ assert_eq!(ranges[3], Range { start: 4, end: 6 });
+ Ok(())
+ }
+
#[test]
fn test_parse_identifiers() -> Result<()> {
let s = "CATALOG.\"F(o)o. \"\"bar\".table";
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 7428c339dc..4817c01e5c 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -48,7 +48,7 @@ use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
use arrow::datatypes::SchemaRef;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
-use datafusion_common::{reverse_sort_options, DataFusionError};
+use datafusion_common::DataFusionError;
use datafusion_physical_expr::utils::{
make_sort_exprs_from_requirements, ordering_satisfy,
ordering_satisfy_requirement_concrete,
@@ -587,7 +587,7 @@ fn analyze_window_sort_removal(
.map(|elem| elem.len())
.unwrap_or(0);
- let mut first_should_reverse = None;
+ let mut needs_reverse = None;
for sort_any in sort_tree.get_leaves() {
let sort_output_ordering = sort_any.output_ordering();
// Variable `sort_any` will either be a `SortExec` or a
@@ -598,32 +598,27 @@ fn analyze_window_sort_removal(
let sort_output_ordering = sort_output_ordering.ok_or_else(|| {
DataFusionError::Plan("A SortExec should have output ordering".to_string())
})?;
- // It is enough to check whether first n_req element of the sort output satisfies window_exec requirement.
- // Because length of window_exec requirement is n_req.
+ // It is enough to check whether the first "n_req" elements of the sort
+ // output satisfy window_exec's requirement as it is only "n_req" long.
let required_ordering = &sort_output_ordering[0..n_req];
if let Some(physical_ordering) = physical_ordering {
- let (can_skip_sorting, should_reverse) = can_skip_sort(
+ if let Some(should_reverse) = can_skip_sort(
window_expr[0].partition_by(),
required_ordering,
&sort_input.schema(),
physical_ordering,
- )?;
- if !can_skip_sorting {
- return Ok(None);
- } else if let Some(first_should_reverse) = first_should_reverse {
- if first_should_reverse != should_reverse {
- return Ok(None);
+ )? {
+ if should_reverse == *needs_reverse.get_or_insert(should_reverse) {
+ continue;
}
- } else {
- first_should_reverse = Some(should_reverse);
}
- } else {
- // If there is no physical ordering, there is no way to remove a
- // sort, so immediately return.
- return Ok(None);
}
+ // If there is no physical ordering, or we can not skip the sort, or
+ // window reversal requirements are not uniform; then there is no
+ // opportunity for a sort removal -- we immediately return.
+ return Ok(None);
}
- let new_window_expr = if first_should_reverse.unwrap() {
+ let new_window_expr = if needs_reverse.unwrap() {
window_expr
.iter()
.map(|e| e.get_reverse_expr())
@@ -784,39 +779,40 @@ fn get_sort_exprs(sort_any: &Arc<dyn ExecutionPlan>) -> Result<&[PhysicalSortExp
#[derive(Debug)]
/// This structure stores extra column information required to remove unnecessary sorts.
pub struct ColumnInfo {
- is_aligned: bool,
reverse: bool,
is_partition: bool,
}
-/// Compares physical ordering and required ordering of all `PhysicalSortExpr`s and returns a tuple.
-/// The first element indicates whether these `PhysicalSortExpr`s can be removed from the physical plan.
-/// The second element is a flag indicating whether we should reverse the sort direction in order to
-/// remove physical sort expressions from the plan.
+/// Compares physical ordering and required ordering of all `PhysicalSortExpr`s
+/// to decide whether a `SortExec` before a `WindowAggExec` can be removed.
+/// A `None` return value indicates that we can remove the sort in question.
+/// A `Some(bool)` value indicates otherwise, and signals whether we need to
+/// reverse the ordering in order to remove the sort in question.
pub fn can_skip_sort(
partition_keys: &[Arc<dyn PhysicalExpr>],
required: &[PhysicalSortExpr],
input_schema: &SchemaRef,
physical_ordering: &[PhysicalSortExpr],
-) -> Result<(bool, bool)> {
+) -> Result<Option<bool>> {
if required.len() > physical_ordering.len() {
- return Ok((false, false));
+ return Ok(None);
}
let mut col_infos = vec![];
for (sort_expr, physical_expr) in zip(required, physical_ordering) {
let column = sort_expr.expr.clone();
let is_partition = partition_keys.iter().any(|e| e.eq(&column));
- let (is_aligned, reverse) =
- check_alignment(input_schema, physical_expr, sort_expr);
- col_infos.push(ColumnInfo {
- is_aligned,
- reverse,
- is_partition,
- });
+ if let Some(reverse) = check_alignment(input_schema, physical_expr, sort_expr)? {
+ col_infos.push(ColumnInfo {
+ reverse,
+ is_partition,
+ });
+ } else {
+ return Ok(None);
+ }
}
let partition_by_sections = col_infos
.iter()
- .filter(|elem| elem.is_partition)
+ .filter(|c| c.is_partition)
.collect::<Vec<_>>();
let can_skip_partition_bys = if partition_by_sections.is_empty() {
true
@@ -824,7 +820,7 @@ pub fn can_skip_sort(
let first_reverse = partition_by_sections[0].reverse;
let can_skip_partition_bys = partition_by_sections
.iter()
- .all(|c| c.is_aligned && c.reverse == first_reverse);
+ .all(|c| c.reverse == first_reverse);
can_skip_partition_bys
};
let order_by_sections = col_infos
@@ -835,38 +831,36 @@ pub fn can_skip_sort(
(true, false)
} else {
let first_reverse = order_by_sections[0].reverse;
- let can_skip_order_bys = order_by_sections
- .iter()
- .all(|c| c.is_aligned && c.reverse == first_reverse);
+ let can_skip_order_bys =
+ order_by_sections.iter().all(|c| c.reverse == first_reverse);
(can_skip_order_bys, first_reverse)
};
let can_skip = can_skip_order_bys && can_skip_partition_bys;
- Ok((can_skip, should_reverse_order_bys))
+ Ok(can_skip.then_some(should_reverse_order_bys))
}
-/// Compares `physical_ordering` and `required` ordering, returns a tuple
-/// indicating (1) whether this column requires sorting, and (2) whether we
-/// should reverse the window expression in order to avoid sorting.
+/// Compares `physical_ordering` and `required` ordering, decides whether
+/// alignments match. A `None` return value indicates that current column is
+/// not aligned. A `Some(bool)` value indicates otherwise, and signals whether
+/// we should reverse the window expression in order to avoid sorting.
fn check_alignment(
input_schema: &SchemaRef,
physical_ordering: &PhysicalSortExpr,
required: &PhysicalSortExpr,
-) -> (bool, bool) {
- if required.expr.eq(&physical_ordering.expr) {
- let nullable = required.expr.nullable(input_schema).unwrap();
+) -> Result<Option<bool>> {
+ Ok(if required.expr.eq(&physical_ordering.expr) {
let physical_opts = physical_ordering.options;
let required_opts = required.options;
- let is_reversed = if nullable {
- physical_opts == reverse_sort_options(required_opts)
+ if required.expr.nullable(input_schema)? {
+ let reverse = physical_opts == !required_opts;
+ (reverse || physical_opts == required_opts).then_some(reverse)
} else {
// If the column is not nullable, NULLS FIRST/LAST is not important.
- physical_opts.descending != required_opts.descending
- };
- let can_skip = !nullable || is_reversed || (physical_opts == required_opts);
- (can_skip, is_reversed)
+ Some(physical_opts.descending != required_opts.descending)
+ }
} else {
- (false, false)
- }
+ None
+ })
}
#[cfg(test)]
@@ -925,17 +919,17 @@ mod tests {
async fn test_is_column_aligned_nullable() -> Result<()> {
let schema = create_test_schema()?;
let params = vec![
- ((true, true), (false, false), (true, true)),
- ((true, true), (false, true), (false, false)),
- ((true, true), (true, false), (false, false)),
- ((true, false), (false, true), (true, true)),
- ((true, false), (false, false), (false, false)),
- ((true, false), (true, true), (false, false)),
+ ((true, true), (false, false), Some(true)),
+ ((true, true), (false, true), None),
+ ((true, true), (true, false), None),
+ ((true, false), (false, true), Some(true)),
+ ((true, false), (false, false), None),
+ ((true, false), (true, true), None),
];
for (
(physical_desc, physical_nulls_first),
(req_desc, req_nulls_first),
- (is_aligned_expected, reverse_expected),
+ expected,
) in params
{
let physical_ordering = PhysicalSortExpr {
@@ -952,10 +946,8 @@ mod tests {
nulls_first: req_nulls_first,
},
};
- let (is_aligned, reverse) =
- check_alignment(&schema, &physical_ordering, &required_ordering);
- assert_eq!(is_aligned, is_aligned_expected);
- assert_eq!(reverse, reverse_expected);
+ let res = check_alignment(&schema, &physical_ordering, &required_ordering)?;
+ assert_eq!(res, expected);
}
Ok(())
@@ -966,17 +958,17 @@ mod tests {
let schema = create_test_schema()?;
let params = vec![
- ((true, true), (false, false), (true, true)),
- ((true, true), (false, true), (true, true)),
- ((true, true), (true, false), (true, false)),
- ((true, false), (false, true), (true, true)),
- ((true, false), (false, false), (true, true)),
- ((true, false), (true, true), (true, false)),
+ ((true, true), (false, false), Some(true)),
+ ((true, true), (false, true), Some(true)),
+ ((true, true), (true, false), Some(false)),
+ ((true, false), (false, true), Some(true)),
+ ((true, false), (false, false), Some(true)),
+ ((true, false), (true, true), Some(false)),
];
for (
(physical_desc, physical_nulls_first),
(req_desc, req_nulls_first),
- (is_aligned_expected, reverse_expected),
+ expected,
) in params
{
let physical_ordering = PhysicalSortExpr {
@@ -993,10 +985,8 @@ mod tests {
nulls_first: req_nulls_first,
},
};
- let (is_aligned, reverse) =
- check_alignment(&schema, &physical_ordering, &required_ordering);
- assert_eq!(is_aligned, is_aligned_expected);
- assert_eq!(reverse, reverse_expected);
+ let res = check_alignment(&schema, &physical_ordering, &required_ordering)?;
+ assert_eq!(res, expected);
}
Ok(())
diff --git a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
index 4b01d3b4ed..443327e516 100644
--- a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
@@ -30,12 +30,9 @@ use crate::physical_plan::{
ColumnStatistics, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr,
};
-use arrow::array::Array;
-use arrow::compute::{
- concat, concat_batches, lexicographical_partition_ranges, SortColumn,
-};
use arrow::{
- array::ArrayRef,
+ array::{Array, ArrayRef},
+ compute::{concat, concat_batches, SortColumn},
datatypes::{Schema, SchemaRef},
record_batch::RecordBatch,
};
@@ -45,12 +42,12 @@ use futures::{ready, StreamExt};
use std::any::Any;
use std::cmp::min;
use std::collections::HashMap;
-use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use crate::physical_plan::windows::calc_requirements;
+use datafusion_common::utils::evaluate_partition_ranges;
use datafusion_physical_expr::window::{
PartitionBatchState, PartitionBatches, PartitionKey, PartitionWindowAggStates,
WindowAggState, WindowState,
@@ -366,7 +363,7 @@ impl PartitionByHandler for SortedPartitionByBoundedWindowStream {
let num_rows = record_batch.num_rows();
if num_rows > 0 {
let partition_points =
- self.evaluate_partition_points(num_rows, &partition_columns)?;
+ evaluate_partition_ranges(num_rows, &partition_columns)?;
for partition_range in partition_points {
let partition_row = partition_columns
.iter()
@@ -628,23 +625,6 @@ impl SortedPartitionByBoundedWindowStream {
.map(|e| e.evaluate_to_sort_column(batch))
.collect::<Result<Vec<_>>>()
}
-
- /// evaluate the partition points given the sort columns; if the sort columns are
- /// empty then the result will be a single element vec of the whole column rows.
- fn evaluate_partition_points(
- &self,
- num_rows: usize,
- partition_columns: &[SortColumn],
- ) -> Result<Vec<Range<usize>>> {
- Ok(if partition_columns.is_empty() {
- vec![Range {
- start: 0,
- end: num_rows,
- }]
- } else {
- lexicographical_partition_ranges(partition_columns)?.collect()
- })
- }
}
impl RecordBatchStream for SortedPartitionByBoundedWindowStream {
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index 75598f1d52..9e738cafd9 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -30,21 +30,19 @@ use crate::physical_plan::{
ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
SendableRecordBatchStream, Statistics, WindowExpr,
};
-use arrow::compute::{
- concat, concat_batches, lexicographical_partition_ranges, SortColumn,
-};
+use arrow::compute::{concat, concat_batches};
use arrow::error::ArrowError;
use arrow::{
array::ArrayRef,
datatypes::{Schema, SchemaRef},
record_batch::RecordBatch,
};
+use datafusion_common::utils::evaluate_partition_ranges;
use datafusion_common::DataFusionError;
use datafusion_physical_expr::PhysicalSortRequirement;
use futures::stream::Stream;
use futures::{ready, StreamExt};
use std::any::Any;
-use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
@@ -336,7 +334,7 @@ impl WindowAggStream {
.map(|elem| elem.evaluate_to_sort_column(&batch))
.collect::<Result<Vec<_>>>()?;
let partition_points =
- self.evaluate_partition_points(batch.num_rows(), &partition_by_sort_keys)?;
+ evaluate_partition_ranges(batch.num_rows(), &partition_by_sort_keys)?;
let mut partition_results = vec![];
// Calculate window cols
@@ -362,25 +360,6 @@ impl WindowAggStream {
batch_columns.extend_from_slice(&columns);
Ok(RecordBatch::try_new(self.schema.clone(), batch_columns)?)
}
-
- /// Evaluates the partition points given the sort columns. If the sort columns are
- /// empty, then the result will be a single element vector spanning the entire batch.
- fn evaluate_partition_points(
- &self,
- num_rows: usize,
- partition_columns: &[SortColumn],
- ) -> Result<Vec<Range<usize>>> {
- Ok(if partition_columns.is_empty() {
- vec![Range {
- start: 0,
- end: num_rows,
- }]
- } else {
- lexicographical_partition_ranges(partition_columns)
- .map_err(DataFusionError::ArrowError)?
- .collect::<Vec<_>>()
- })
- }
}
impl Stream for WindowAggStream {
diff --git a/datafusion/physical-expr/src/window/built_in.rs b/datafusion/physical-expr/src/window/built_in.rs
index 329eac3334..1576e8e9c4 100644
--- a/datafusion/physical-expr/src/window/built_in.rs
+++ b/datafusion/physical-expr/src/window/built_in.rs
@@ -35,6 +35,7 @@ use arrow::array::{new_empty_array, Array, ArrayRef};
use arrow::compute::SortOptions;
use arrow::datatypes::Field;
use arrow::record_batch::RecordBatch;
+use datafusion_common::utils::evaluate_partition_ranges;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::WindowFrame;
@@ -122,8 +123,7 @@ impl WindowExpr for BuiltInWindowExpr {
ScalarValue::iter_to_array(row_wise_results.into_iter())
} else if evaluator.include_rank() {
let columns = self.sort_columns(batch)?;
- let sort_partition_points =
- self.evaluate_partition_points(num_rows, &columns)?;
+ let sort_partition_points = evaluate_partition_ranges(num_rows, &columns)?;
evaluator.evaluate_with_rank(num_rows, &sort_partition_points)
} else {
let (values, _) = self.get_values_orderbys(batch)?;
@@ -168,7 +168,7 @@ impl WindowExpr for BuiltInWindowExpr {
let num_rows = record_batch.num_rows();
let sort_partition_points = if evaluator.include_rank() {
let columns = self.sort_columns(record_batch)?;
- self.evaluate_partition_points(num_rows, &columns)?
+ evaluate_partition_ranges(num_rows, &columns)?
} else {
vec![]
};
diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs
index 7568fa3b2b..45e6f46ef0 100644
--- a/datafusion/physical-expr/src/window/window_expr.rs
+++ b/datafusion/physical-expr/src/window/window_expr.rs
@@ -19,13 +19,12 @@ use crate::window::partition_evaluator::PartitionEvaluator;
use crate::window::window_frame_state::WindowFrameContext;
use crate::{PhysicalExpr, PhysicalSortExpr};
use arrow::array::{new_empty_array, Array, ArrayRef};
-use arrow::compute::kernels::partition::lexicographical_partition_ranges;
use arrow::compute::kernels::sort::SortColumn;
use arrow::compute::{concat, SortOptions};
use arrow::datatypes::Field;
use arrow::record_batch::RecordBatch;
use arrow_schema::DataType;
-use datafusion_common::{reverse_sort_options, DataFusionError, Result, ScalarValue};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_expr::{Accumulator, WindowFrame};
use indexmap::IndexMap;
use std::any::Any;
@@ -80,25 +79,6 @@ pub trait WindowExpr: Send + Sync + Debug {
)))
}
- /// Evaluate the partition points given the sort columns; if the sort columns are
- /// empty then the result will be a single element `Vec` of the whole column rows.
- fn evaluate_partition_points(
- &self,
- num_rows: usize,
- partition_columns: &[SortColumn],
- ) -> Result<Vec<Range<usize>>> {
- if partition_columns.is_empty() {
- Ok(vec![Range {
- start: 0,
- end: num_rows,
- }])
- } else {
- Ok(lexicographical_partition_ranges(partition_columns)
- .map_err(DataFusionError::ArrowError)?
- .collect::<Vec<_>>())
- }
- }
-
/// Expressions that's from the window function's partition by clause, empty if absent
fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>];
@@ -281,7 +261,7 @@ pub fn reverse_order_bys(order_bys: &[PhysicalSortExpr]) -> Vec<PhysicalSortExpr
.iter()
.map(|e| PhysicalSortExpr {
expr: e.expr.clone(),
- options: reverse_sort_options(e.options),
+ options: !e.options,
})
.collect()
}