You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/10 04:42:40 UTC
[arrow-datafusion] branch main updated: Remove the PhysicalSortExpr restriction on union get meet (#6273)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new b1af8de64d Remove the PhysicalSortExpr restriction on union get meet (#6273)
b1af8de64d is described below
commit b1af8de64dbca1480b45c1c62f2d75915a7ab48f
Author: Berkay Şahin <12...@users.noreply.github.com>
AuthorDate: Wed May 10 07:42:34 2023 +0300
Remove the PhysicalSortExpr restriction on union get meet (#6273)
* get_meet_of_orderings_helper is now checking the index and sort options of the columns
* simplifications
* Update datafusion/core/src/physical_plan/common.rs
Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
* handle non-column cases
* plan test is added
* Simplify schema alignment check
---------
Co-authored-by: Mustafa Akur <mu...@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
datafusion/core/src/physical_plan/common.rs | 193 ++++++++++++++++++---
.../core/tests/sqllogictests/test_files/union.slt | 73 ++++++++
2 files changed, 240 insertions(+), 26 deletions(-)
diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs
index ce1299fb6d..18b13444d6 100644
--- a/datafusion/core/src/physical_plan/common.rs
+++ b/datafusion/core/src/physical_plan/common.rs
@@ -25,7 +25,8 @@ use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statist
use arrow::datatypes::Schema;
use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
use arrow::record_batch::RecordBatch;
-use datafusion_physical_expr::PhysicalSortExpr;
+use datafusion_physical_expr::expressions::{BinaryExpr, Column};
+use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use futures::{Future, StreamExt, TryStreamExt};
use log::debug;
use parking_lot::Mutex;
@@ -252,20 +253,53 @@ fn get_meet_of_orderings_helper(
for ordering in orderings.iter() {
if idx >= ordering.len() {
return Some(ordering);
- } else if ordering[idx] != first[idx] {
- return if idx > 0 {
- Some(&ordering[..idx])
- } else {
- None
- };
+ } else {
+ let schema_aligned = check_expr_alignment(
+ ordering[idx].expr.as_ref(),
+ first[idx].expr.as_ref(),
+ );
+ if !schema_aligned || (ordering[idx].options != first[idx].options) {
+ // In a union, the output schema is that of the first child (by convention).
+ // Therefore, generate the result from the first child's schema:
+ return if idx > 0 { Some(&first[..idx]) } else { None };
+ }
}
}
idx += 1;
}
+
+ fn check_expr_alignment(first: &dyn PhysicalExpr, second: &dyn PhysicalExpr) -> bool {
+ match (
+ first.as_any().downcast_ref::<Column>(),
+ second.as_any().downcast_ref::<Column>(),
+ first.as_any().downcast_ref::<BinaryExpr>(),
+ second.as_any().downcast_ref::<BinaryExpr>(),
+ ) {
+ (Some(first_col), Some(second_col), _, _) => {
+ first_col.index() == second_col.index()
+ }
+ (_, _, Some(first_binary), Some(second_binary)) => {
+ if first_binary.op() == second_binary.op() {
+ check_expr_alignment(
+ first_binary.left().as_ref(),
+ second_binary.left().as_ref(),
+ ) && check_expr_alignment(
+ first_binary.right().as_ref(),
+ second_binary.right().as_ref(),
+ )
+ } else {
+ false
+ }
+ }
+ (_, _, _, _) => false,
+ }
+ }
}
#[cfg(test)]
mod tests {
+ use std::ops::Not;
+
use super::*;
use crate::from_slice::FromSlice;
use crate::physical_plan::memory::MemoryExec;
@@ -277,6 +311,7 @@ mod tests {
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
+ use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{col, Column};
#[test]
@@ -298,41 +333,80 @@ mod tests {
let input2: Vec<PhysicalSortExpr> = vec![
PhysicalSortExpr {
- expr: Arc::new(Column::new("a", 0)),
+ expr: Arc::new(Column::new("x", 0)),
options: SortOptions::default(),
},
PhysicalSortExpr {
- expr: Arc::new(Column::new("b", 1)),
+ expr: Arc::new(Column::new("y", 1)),
options: SortOptions::default(),
},
PhysicalSortExpr {
- expr: Arc::new(Column::new("y", 2)),
+ expr: Arc::new(Column::new("z", 2)),
options: SortOptions::default(),
},
];
let input3: Vec<PhysicalSortExpr> = vec![
PhysicalSortExpr {
- expr: Arc::new(Column::new("a", 0)),
+ expr: Arc::new(Column::new("d", 0)),
options: SortOptions::default(),
},
PhysicalSortExpr {
- expr: Arc::new(Column::new("x", 1)),
+ expr: Arc::new(Column::new("e", 1)),
options: SortOptions::default(),
},
PhysicalSortExpr {
- expr: Arc::new(Column::new("y", 2)),
+ expr: Arc::new(Column::new("f", 2)),
options: SortOptions::default(),
},
];
- let expected = vec![PhysicalSortExpr {
- expr: Arc::new(Column::new("a", 0)),
- options: SortOptions::default(),
- }];
+ let input4: Vec<PhysicalSortExpr> = vec![
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("g", 0)),
+ options: SortOptions::default(),
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("h", 1)),
+ options: SortOptions::default(),
+ },
+ PhysicalSortExpr {
+ // Note that index of this column is not 2. Hence this 3rd entry shouldn't be
+ // in the output ordering.
+ expr: Arc::new(Column::new("i", 3)),
+ options: SortOptions::default(),
+ },
+ ];
+ let expected = vec![
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ options: SortOptions::default(),
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ options: SortOptions::default(),
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("c", 2)),
+ options: SortOptions::default(),
+ },
+ ];
let result = get_meet_of_orderings_helper(vec![&input1, &input2, &input3]);
assert_eq!(result.unwrap(), expected);
+
+ let expected = vec![
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ options: SortOptions::default(),
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("b", 1)),
+ options: SortOptions::default(),
+ },
+ ];
+ let result = get_meet_of_orderings_helper(vec![&input1, &input2, &input4]);
+ assert_eq!(result.unwrap(), expected);
Ok(())
}
@@ -351,30 +425,30 @@ mod tests {
let input2: Vec<PhysicalSortExpr> = vec![
PhysicalSortExpr {
- expr: Arc::new(Column::new("a", 0)),
+ expr: Arc::new(Column::new("c", 0)),
options: SortOptions::default(),
},
PhysicalSortExpr {
- expr: Arc::new(Column::new("b", 1)),
+ expr: Arc::new(Column::new("d", 1)),
options: SortOptions::default(),
},
PhysicalSortExpr {
- expr: Arc::new(Column::new("c", 2)),
+ expr: Arc::new(Column::new("e", 2)),
options: SortOptions::default(),
},
];
let input3: Vec<PhysicalSortExpr> = vec![
PhysicalSortExpr {
- expr: Arc::new(Column::new("a", 0)),
+ expr: Arc::new(Column::new("f", 0)),
options: SortOptions::default(),
},
PhysicalSortExpr {
- expr: Arc::new(Column::new("b", 1)),
+ expr: Arc::new(Column::new("g", 1)),
options: SortOptions::default(),
},
PhysicalSortExpr {
- expr: Arc::new(Column::new("d", 2)),
+ expr: Arc::new(Column::new("h", 2)),
options: SortOptions::default(),
},
];
@@ -389,7 +463,9 @@ mod tests {
let input1: Vec<PhysicalSortExpr> = vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("a", 0)),
- options: SortOptions::default(),
+ // Since ordering is conflicting with other inputs
+ // output ordering should be empty
+ options: SortOptions::default().not(),
},
PhysicalSortExpr {
expr: Arc::new(Column::new("b", 1)),
@@ -410,7 +486,7 @@ mod tests {
let input3: Vec<PhysicalSortExpr> = vec![
PhysicalSortExpr {
- expr: Arc::new(Column::new("a", 0)),
+ expr: Arc::new(Column::new("a", 2)),
options: SortOptions::default(),
},
PhysicalSortExpr {
@@ -419,7 +495,72 @@ mod tests {
},
];
- let result = get_meet_of_orderings_helper(vec![&input1, &input2, &input3]);
+ let result = get_meet_of_orderings_helper(vec![&input1, &input2]);
+ assert!(result.is_none());
+
+ let result = get_meet_of_orderings_helper(vec![&input2, &input3]);
+ assert!(result.is_none());
+
+ let result = get_meet_of_orderings_helper(vec![&input1, &input3]);
+ assert!(result.is_none());
+ Ok(())
+ }
+
+ #[test]
+ fn get_meet_of_orderings_helper_binary_exprs() -> Result<()> {
+ let input1: Vec<PhysicalSortExpr> = vec![
+ PhysicalSortExpr {
+ expr: Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("a", 0)),
+ Operator::Plus,
+ Arc::new(Column::new("b", 1)),
+ )),
+ options: SortOptions::default(),
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("c", 2)),
+ options: SortOptions::default(),
+ },
+ ];
+
+ let input2: Vec<PhysicalSortExpr> = vec![
+ PhysicalSortExpr {
+ expr: Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("x", 0)),
+ Operator::Plus,
+ Arc::new(Column::new("y", 1)),
+ )),
+ options: SortOptions::default(),
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("z", 2)),
+ options: SortOptions::default(),
+ },
+ ];
+
+ // erroneous input
+ let input3: Vec<PhysicalSortExpr> = vec![
+ PhysicalSortExpr {
+ expr: Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("a", 1)),
+ Operator::Plus,
+ Arc::new(Column::new("b", 0)),
+ )),
+ options: SortOptions::default(),
+ },
+ PhysicalSortExpr {
+ expr: Arc::new(Column::new("c", 2)),
+ options: SortOptions::default(),
+ },
+ ];
+
+ let result = get_meet_of_orderings_helper(vec![&input1, &input2]);
+ assert_eq!(input1, result.unwrap());
+
+ let result = get_meet_of_orderings_helper(vec![&input2, &input3]);
+ assert!(result.is_none());
+
+ let result = get_meet_of_orderings_helper(vec![&input1, &input3]);
assert!(result.is_none());
Ok(())
}
diff --git a/datafusion/core/tests/sqllogictests/test_files/union.slt b/datafusion/core/tests/sqllogictests/test_files/union.slt
index 4860627cff..a9e247d478 100644
--- a/datafusion/core/tests/sqllogictests/test_files/union.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/union.slt
@@ -373,3 +373,76 @@ drop table t2;
statement ok
drop table aggregate_test_100;
+
+statement ok
+CREATE EXTERNAL TABLE t1 (
+ c1 VARCHAR NOT NULL,
+ c2 TINYINT NOT NULL,
+ c3 SMALLINT NOT NULL,
+ c4 SMALLINT NOT NULL,
+ c5 INTEGER NOT NULL,
+ c6 BIGINT NOT NULL,
+ c7 SMALLINT NOT NULL,
+ c8 INT NOT NULL,
+ c9 INT UNSIGNED NOT NULL,
+ c10 BIGINT UNSIGNED NOT NULL,
+ c11 FLOAT NOT NULL,
+ c12 DOUBLE NOT NULL,
+ c13 VARCHAR NOT NULL
+ )
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (c1 ASC)
+LOCATION '../../testing/data/csv/aggregate_test_100.csv';
+
+statement ok
+CREATE EXTERNAL TABLE t2 (
+ c1a VARCHAR NOT NULL,
+ c2 TINYINT NOT NULL,
+ c3 SMALLINT NOT NULL,
+ c4 SMALLINT NOT NULL,
+ c5 INTEGER NOT NULL,
+ c6 BIGINT NOT NULL,
+ c7 SMALLINT NOT NULL,
+ c8 INT NOT NULL,
+ c9 INT UNSIGNED NOT NULL,
+ c10 BIGINT UNSIGNED NOT NULL,
+ c11 FLOAT NOT NULL,
+ c12 DOUBLE NOT NULL,
+ c13 VARCHAR NOT NULL
+ )
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (c1a ASC)
+LOCATION '../../testing/data/csv/aggregate_test_100.csv';
+
+query TT
+explain
+SELECT c1 FROM(
+(
+ SELECT c1 FROM t1
+)
+UNION ALL
+(
+ SELECT c1a FROM t2
+))
+ORDER BY c1
+----
+logical_plan
+ Sort: t1.c1 ASC NULLS LAST
+ Union
+ TableScan: t1 projection=[c1]
+ Projection: t2.c1a AS t1.c1
+ TableScan: t2 projection=[c1a]
+physical_plan
+ SortPreservingMergeExec: [c1@0 ASC NULLS LAST]
+ UnionExec
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], output_ordering=[c1@0 ASC NULLS LAST], has_header=true
+ ProjectionExec: expr=[c1a@0 as t1.c1]
+ CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1a], output_ordering=[c1a@0 ASC NULLS LAST], has_header=true
+
+statement ok
+drop table t1
+
+statement ok
+drop table t2
\ No newline at end of file