You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/10 04:42:40 UTC

[arrow-datafusion] branch main updated: Remove the PhysicalSortExpr restriction on union get meet (#6273)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new b1af8de64d Remove the PhysicalSortExpr restriction on union get meet (#6273)
b1af8de64d is described below

commit b1af8de64dbca1480b45c1c62f2d75915a7ab48f
Author: Berkay Şahin <12...@users.noreply.github.com>
AuthorDate: Wed May 10 07:42:34 2023 +0300

    Remove the PhysicalSortExpr restriction on union get meet (#6273)
    
    * get_meet_of_orderings_helper is now checking the index and sort options of the columns
    
    * simplifications
    
    * Update datafusion/core/src/physical_plan/common.rs
    
    Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
    
    * handle non-column cases
    
    * plan test is added
    
    * Simplify schema alignment check
    
    ---------
    
    Co-authored-by: Mustafa Akur <mu...@synnada.ai>
    Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
 datafusion/core/src/physical_plan/common.rs        | 193 ++++++++++++++++++---
 .../core/tests/sqllogictests/test_files/union.slt  |  73 ++++++++
 2 files changed, 240 insertions(+), 26 deletions(-)

diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs
index ce1299fb6d..18b13444d6 100644
--- a/datafusion/core/src/physical_plan/common.rs
+++ b/datafusion/core/src/physical_plan/common.rs
@@ -25,7 +25,8 @@ use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statist
 use arrow::datatypes::Schema;
 use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
 use arrow::record_batch::RecordBatch;
-use datafusion_physical_expr::PhysicalSortExpr;
+use datafusion_physical_expr::expressions::{BinaryExpr, Column};
+use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
 use futures::{Future, StreamExt, TryStreamExt};
 use log::debug;
 use parking_lot::Mutex;
@@ -252,20 +253,53 @@ fn get_meet_of_orderings_helper(
         for ordering in orderings.iter() {
             if idx >= ordering.len() {
                 return Some(ordering);
-            } else if ordering[idx] != first[idx] {
-                return if idx > 0 {
-                    Some(&ordering[..idx])
-                } else {
-                    None
-                };
+            } else {
+                let schema_aligned = check_expr_alignment(
+                    ordering[idx].expr.as_ref(),
+                    first[idx].expr.as_ref(),
+                );
+                if !schema_aligned || (ordering[idx].options != first[idx].options) {
+                    // In a union, the output schema is that of the first child (by convention).
+                    // Therefore, generate the result from the first child's schema:
+                    return if idx > 0 { Some(&first[..idx]) } else { None };
+                }
             }
         }
         idx += 1;
     }
+
+    fn check_expr_alignment(first: &dyn PhysicalExpr, second: &dyn PhysicalExpr) -> bool {
+        match (
+            first.as_any().downcast_ref::<Column>(),
+            second.as_any().downcast_ref::<Column>(),
+            first.as_any().downcast_ref::<BinaryExpr>(),
+            second.as_any().downcast_ref::<BinaryExpr>(),
+        ) {
+            (Some(first_col), Some(second_col), _, _) => {
+                first_col.index() == second_col.index()
+            }
+            (_, _, Some(first_binary), Some(second_binary)) => {
+                if first_binary.op() == second_binary.op() {
+                    check_expr_alignment(
+                        first_binary.left().as_ref(),
+                        second_binary.left().as_ref(),
+                    ) && check_expr_alignment(
+                        first_binary.right().as_ref(),
+                        second_binary.right().as_ref(),
+                    )
+                } else {
+                    false
+                }
+            }
+            (_, _, _, _) => false,
+        }
+    }
 }
 
 #[cfg(test)]
 mod tests {
+    use std::ops::Not;
+
     use super::*;
     use crate::from_slice::FromSlice;
     use crate::physical_plan::memory::MemoryExec;
@@ -277,6 +311,7 @@ mod tests {
         datatypes::{DataType, Field, Schema},
         record_batch::RecordBatch,
     };
+    use datafusion_expr::Operator;
     use datafusion_physical_expr::expressions::{col, Column};
 
     #[test]
@@ -298,41 +333,80 @@ mod tests {
 
         let input2: Vec<PhysicalSortExpr> = vec![
             PhysicalSortExpr {
-                expr: Arc::new(Column::new("a", 0)),
+                expr: Arc::new(Column::new("x", 0)),
                 options: SortOptions::default(),
             },
             PhysicalSortExpr {
-                expr: Arc::new(Column::new("b", 1)),
+                expr: Arc::new(Column::new("y", 1)),
                 options: SortOptions::default(),
             },
             PhysicalSortExpr {
-                expr: Arc::new(Column::new("y", 2)),
+                expr: Arc::new(Column::new("z", 2)),
                 options: SortOptions::default(),
             },
         ];
 
         let input3: Vec<PhysicalSortExpr> = vec![
             PhysicalSortExpr {
-                expr: Arc::new(Column::new("a", 0)),
+                expr: Arc::new(Column::new("d", 0)),
                 options: SortOptions::default(),
             },
             PhysicalSortExpr {
-                expr: Arc::new(Column::new("x", 1)),
+                expr: Arc::new(Column::new("e", 1)),
                 options: SortOptions::default(),
             },
             PhysicalSortExpr {
-                expr: Arc::new(Column::new("y", 2)),
+                expr: Arc::new(Column::new("f", 2)),
                 options: SortOptions::default(),
             },
         ];
 
-        let expected = vec![PhysicalSortExpr {
-            expr: Arc::new(Column::new("a", 0)),
-            options: SortOptions::default(),
-        }];
+        let input4: Vec<PhysicalSortExpr> = vec![
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("g", 0)),
+                options: SortOptions::default(),
+            },
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("h", 1)),
+                options: SortOptions::default(),
+            },
+            PhysicalSortExpr {
+                // Note that index of this column is not 2. Hence this 3rd entry shouldn't be
+                // in the output ordering.
+                expr: Arc::new(Column::new("i", 3)),
+                options: SortOptions::default(),
+            },
+        ];
 
+        let expected = vec![
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("a", 0)),
+                options: SortOptions::default(),
+            },
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("b", 1)),
+                options: SortOptions::default(),
+            },
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("c", 2)),
+                options: SortOptions::default(),
+            },
+        ];
         let result = get_meet_of_orderings_helper(vec![&input1, &input2, &input3]);
         assert_eq!(result.unwrap(), expected);
+
+        let expected = vec![
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("a", 0)),
+                options: SortOptions::default(),
+            },
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("b", 1)),
+                options: SortOptions::default(),
+            },
+        ];
+        let result = get_meet_of_orderings_helper(vec![&input1, &input2, &input4]);
+        assert_eq!(result.unwrap(), expected);
         Ok(())
     }
 
@@ -351,30 +425,30 @@ mod tests {
 
         let input2: Vec<PhysicalSortExpr> = vec![
             PhysicalSortExpr {
-                expr: Arc::new(Column::new("a", 0)),
+                expr: Arc::new(Column::new("c", 0)),
                 options: SortOptions::default(),
             },
             PhysicalSortExpr {
-                expr: Arc::new(Column::new("b", 1)),
+                expr: Arc::new(Column::new("d", 1)),
                 options: SortOptions::default(),
             },
             PhysicalSortExpr {
-                expr: Arc::new(Column::new("c", 2)),
+                expr: Arc::new(Column::new("e", 2)),
                 options: SortOptions::default(),
             },
         ];
 
         let input3: Vec<PhysicalSortExpr> = vec![
             PhysicalSortExpr {
-                expr: Arc::new(Column::new("a", 0)),
+                expr: Arc::new(Column::new("f", 0)),
                 options: SortOptions::default(),
             },
             PhysicalSortExpr {
-                expr: Arc::new(Column::new("b", 1)),
+                expr: Arc::new(Column::new("g", 1)),
                 options: SortOptions::default(),
             },
             PhysicalSortExpr {
-                expr: Arc::new(Column::new("d", 2)),
+                expr: Arc::new(Column::new("h", 2)),
                 options: SortOptions::default(),
             },
         ];
@@ -389,7 +463,9 @@ mod tests {
         let input1: Vec<PhysicalSortExpr> = vec![
             PhysicalSortExpr {
                 expr: Arc::new(Column::new("a", 0)),
-                options: SortOptions::default(),
+                // Since ordering is conflicting with other inputs
+                // output ordering should be empty
+                options: SortOptions::default().not(),
             },
             PhysicalSortExpr {
                 expr: Arc::new(Column::new("b", 1)),
@@ -410,7 +486,7 @@ mod tests {
 
         let input3: Vec<PhysicalSortExpr> = vec![
             PhysicalSortExpr {
-                expr: Arc::new(Column::new("a", 0)),
+                expr: Arc::new(Column::new("a", 2)),
                 options: SortOptions::default(),
             },
             PhysicalSortExpr {
@@ -419,7 +495,72 @@ mod tests {
             },
         ];
 
-        let result = get_meet_of_orderings_helper(vec![&input1, &input2, &input3]);
+        let result = get_meet_of_orderings_helper(vec![&input1, &input2]);
+        assert!(result.is_none());
+
+        let result = get_meet_of_orderings_helper(vec![&input2, &input3]);
+        assert!(result.is_none());
+
+        let result = get_meet_of_orderings_helper(vec![&input1, &input3]);
+        assert!(result.is_none());
+        Ok(())
+    }
+
+    #[test]
+    fn get_meet_of_orderings_helper_binary_exprs() -> Result<()> {
+        let input1: Vec<PhysicalSortExpr> = vec![
+            PhysicalSortExpr {
+                expr: Arc::new(BinaryExpr::new(
+                    Arc::new(Column::new("a", 0)),
+                    Operator::Plus,
+                    Arc::new(Column::new("b", 1)),
+                )),
+                options: SortOptions::default(),
+            },
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("c", 2)),
+                options: SortOptions::default(),
+            },
+        ];
+
+        let input2: Vec<PhysicalSortExpr> = vec![
+            PhysicalSortExpr {
+                expr: Arc::new(BinaryExpr::new(
+                    Arc::new(Column::new("x", 0)),
+                    Operator::Plus,
+                    Arc::new(Column::new("y", 1)),
+                )),
+                options: SortOptions::default(),
+            },
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("z", 2)),
+                options: SortOptions::default(),
+            },
+        ];
+
+        // erroneous input
+        let input3: Vec<PhysicalSortExpr> = vec![
+            PhysicalSortExpr {
+                expr: Arc::new(BinaryExpr::new(
+                    Arc::new(Column::new("a", 1)),
+                    Operator::Plus,
+                    Arc::new(Column::new("b", 0)),
+                )),
+                options: SortOptions::default(),
+            },
+            PhysicalSortExpr {
+                expr: Arc::new(Column::new("c", 2)),
+                options: SortOptions::default(),
+            },
+        ];
+
+        let result = get_meet_of_orderings_helper(vec![&input1, &input2]);
+        assert_eq!(input1, result.unwrap());
+
+        let result = get_meet_of_orderings_helper(vec![&input2, &input3]);
+        assert!(result.is_none());
+
+        let result = get_meet_of_orderings_helper(vec![&input1, &input3]);
         assert!(result.is_none());
         Ok(())
     }
diff --git a/datafusion/core/tests/sqllogictests/test_files/union.slt b/datafusion/core/tests/sqllogictests/test_files/union.slt
index 4860627cff..a9e247d478 100644
--- a/datafusion/core/tests/sqllogictests/test_files/union.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/union.slt
@@ -373,3 +373,76 @@ drop table t2;
 
 statement ok
 drop table aggregate_test_100;
+
+statement ok
+CREATE EXTERNAL TABLE t1 (
+        c1  VARCHAR NOT NULL,
+        c2  TINYINT NOT NULL,
+        c3  SMALLINT NOT NULL,
+        c4  SMALLINT NOT NULL,
+        c5  INTEGER NOT NULL,
+        c6  BIGINT NOT NULL,
+        c7  SMALLINT NOT NULL,
+        c8  INT NOT NULL,
+        c9  INT UNSIGNED NOT NULL,
+        c10 BIGINT UNSIGNED NOT NULL,
+        c11 FLOAT NOT NULL,
+        c12 DOUBLE NOT NULL,
+        c13 VARCHAR NOT NULL
+    )
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (c1 ASC)
+LOCATION '../../testing/data/csv/aggregate_test_100.csv';
+
+statement ok
+CREATE EXTERNAL TABLE t2 (
+        c1a  VARCHAR NOT NULL,
+        c2  TINYINT NOT NULL,
+        c3  SMALLINT NOT NULL,
+        c4  SMALLINT NOT NULL,
+        c5  INTEGER NOT NULL,
+        c6  BIGINT NOT NULL,
+        c7  SMALLINT NOT NULL,
+        c8  INT NOT NULL,
+        c9  INT UNSIGNED NOT NULL,
+        c10 BIGINT UNSIGNED NOT NULL,
+        c11 FLOAT NOT NULL,
+        c12 DOUBLE NOT NULL,
+        c13 VARCHAR NOT NULL
+    )
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (c1a ASC)
+LOCATION '../../testing/data/csv/aggregate_test_100.csv';
+
+query TT
+explain
+SELECT c1 FROM(
+(   
+    SELECT c1 FROM t1
+)  
+UNION ALL
+(   
+    SELECT c1a FROM t2
+))
+ORDER BY c1
+----
+logical_plan
+  Sort: t1.c1 ASC NULLS LAST
+    Union
+      TableScan: t1 projection=[c1]
+      Projection: t2.c1a AS t1.c1
+        TableScan: t2 projection=[c1a]
+physical_plan
+  SortPreservingMergeExec: [c1@0 ASC NULLS LAST]
+    UnionExec
+      CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], output_ordering=[c1@0 ASC NULLS LAST], has_header=true
+        ProjectionExec: expr=[c1a@0 as t1.c1]
+      CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1a], output_ordering=[c1a@0 ASC NULLS LAST], has_header=true
+
+statement ok
+drop table t1
+
+statement ok
+drop table t2
\ No newline at end of file