You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/04/12 20:57:32 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #5951: Treat Partition by columns as set for window functions

alamb commented on code in PR #5951:
URL: https://github.com/apache/arrow-datafusion/pull/5951#discussion_r1164633563


##########
datafusion/core/src/physical_optimizer/utils.rs:
##########
@@ -113,3 +175,49 @@ pub fn is_union(plan: &Arc<dyn ExecutionPlan>) -> bool {
 pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
     plan.as_any().is::<RepartitionExec>()
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[tokio::test]
+    async fn test_find_indices() -> Result<()> {
+        assert_eq!(find_indices(&[0, 3, 4], [0, 3, 4])?, vec![0, 1, 2]);
+        assert_eq!(find_indices(&[0, 3, 4], [0, 4, 3])?, vec![0, 2, 1]);
+        assert_eq!(find_indices(&[3, 0, 4], [0, 3])?, vec![1, 0]);
+        assert!(find_indices(&[0, 3], [0, 3, 4]).is_err());
+        assert!(find_indices(&[0, 3, 4], [0, 2]).is_err());
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_merge_and_order_indices() {
+        assert_eq!(
+            merge_and_order_indices([0, 3, 4], [1, 3, 5]),
+            vec![0, 1, 3, 4, 5]
+        );
+        // Result should be ordered, even if inputs are not
+        assert_eq!(
+            merge_and_order_indices([3, 0, 4], [5, 1, 3]),
+            vec![0, 1, 3, 4, 5]
+        );
+    }
+
+    #[tokio::test]
+    async fn test_is_sorted() {
+        assert!(is_sorted([0, 3, 4]));
+        assert!(is_sorted([0, 1, 2]));
+        assert!(is_sorted([0, 1, 4]));
+        assert!(is_sorted([0usize; 0]));
+        assert!(is_sorted([1, 2]));
+        assert!(!is_sorted([3, 2]));
+    }
+
+    #[tokio::test]
+    async fn test_set_difference() {
+        assert_eq!(set_difference([0, 3, 4], [1, 2]), vec![0, 3, 4]);
+        assert_eq!(set_difference([0, 3, 4], [1, 2, 4]), vec![0, 3]);

Review Comment:
   it might be good to make a test for sets that are not ordered 
   
   like
   
   ```suggestion
           assert_eq!(set_difference([0, 3, 4], [1, 2, 4]), vec![0, 3]);
           assert_eq!(set_difference([0, 3, 4], [4, 1, 2]), vec![0, 3]);
           assert_eq!(set_difference([3, 4, 0], [4, 1, 2]), vec![3, 0]);
   ```



##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -2036,6 +2036,184 @@ SELECT
 -1114 -1927628110
 15673 -1899175111
 
+# test_window_agg_partition_by_set
+statement ok

Review Comment:
   ```suggestion
   # These tests checks for whether BoundedWindowAggExec treats partition by expressions as set. 
   # Physical plan shouldn't have SortExec in between BoundedWindowAggExec
   
   
   statement ok
   ```



##########
datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs:
##########
@@ -428,10 +434,19 @@ impl SortedPartitionByBoundedWindowStream {
         input: SendableRecordBatchStream,
         baseline_metrics: BaselineMetrics,
         partition_by_sort_keys: Vec<PhysicalSortExpr>,
-    ) -> Self {
+        ordered_partition_by_indices: Vec<usize>,
+    ) -> Result<Self> {
         let state = window_expr.iter().map(|_| IndexMap::new()).collect();
         let empty_batch = RecordBatch::new_empty(schema.clone());
-        Self {
+
+        // In BoundedWindowAggExec all partition by columns should be ordered.
+        if window_expr[0].partition_by().len() != ordered_partition_by_indices.len() {
+            return Err(DataFusionError::Execution(

Review Comment:
   ```suggestion
               return Err(DataFusionError::Internal(
   ```



##########
datafusion/core/src/physical_optimizer/utils.rs:
##########
@@ -113,3 +175,49 @@ pub fn is_union(plan: &Arc<dyn ExecutionPlan>) -> bool {
 pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
     plan.as_any().is::<RepartitionExec>()
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[tokio::test]
+    async fn test_find_indices() -> Result<()> {
+        assert_eq!(find_indices(&[0, 3, 4], [0, 3, 4])?, vec![0, 1, 2]);
+        assert_eq!(find_indices(&[0, 3, 4], [0, 4, 3])?, vec![0, 2, 1]);
+        assert_eq!(find_indices(&[3, 0, 4], [0, 3])?, vec![1, 0]);
+        assert!(find_indices(&[0, 3], [0, 3, 4]).is_err());
+        assert!(find_indices(&[0, 3, 4], [0, 2]).is_err());
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_merge_and_order_indices() {
+        assert_eq!(
+            merge_and_order_indices([0, 3, 4], [1, 3, 5]),
+            vec![0, 1, 3, 4, 5]
+        );
+        // Result should be ordered, even if inputs are not
+        assert_eq!(
+            merge_and_order_indices([3, 0, 4], [5, 1, 3]),
+            vec![0, 1, 3, 4, 5]
+        );
+    }
+
+    #[tokio::test]
+    async fn test_is_sorted() {
+        assert!(is_sorted([0, 3, 4]));

Review Comment:
   maybe we should also add a test for `[]` and `[1]` (aka zero length and 1 length slices)



##########
datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs:
##########
@@ -73,6 +75,8 @@ pub struct BoundedWindowAggExec {
     pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
+    /// Partition by indices that define ordering

Review Comment:
   Maybe putting this documentation on 
   
   ```
   pub(crate) fn get_ordered_partition_by_indices(
   ``` 
   
   and saying "see `get_ordered_partition_by_indices` for more details"  would be a good structure



##########
datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs:
##########
@@ -73,6 +75,8 @@ pub struct BoundedWindowAggExec {
     pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
+    /// Partition by indices that define ordering

Review Comment:
   I am still a little confused about what a partition by index that defines an ordering and one that does not define an ordering means. Maybe we could change the text to be more explicit 
   
   For example, is this a subset of the partition by expressions that are also part of hte ORDER by clause
   
   Maybe we could offer an example of what indexes this set would be for a query like
   
   ```
   OVER (PARTITION BY a, b ORDER BY y, z)
   ```



##########
datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs:
##########
@@ -428,10 +434,19 @@ impl SortedPartitionByBoundedWindowStream {
         input: SendableRecordBatchStream,
         baseline_metrics: BaselineMetrics,
         partition_by_sort_keys: Vec<PhysicalSortExpr>,
-    ) -> Self {
+        ordered_partition_by_indices: Vec<usize>,
+    ) -> Result<Self> {
         let state = window_expr.iter().map(|_| IndexMap::new()).collect();
         let empty_batch = RecordBatch::new_empty(schema.clone());
-        Self {
+
+        // In BoundedWindowAggExec all partition by columns should be ordered.
+        if window_expr[0].partition_by().len() != ordered_partition_by_indices.len() {
+            return Err(DataFusionError::Execution(

Review Comment:
   This should only happen if there is a bug, right? The error won't happen on poorly formed sql, is that right?



##########
datafusion/core/tests/sqllogictests/test_files/window.slt:
##########
@@ -2036,6 +2036,184 @@ SELECT
 -1114 -1927628110
 15673 -1899175111
 
+# test_window_agg_partition_by_set
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+query TT
+EXPLAIN SELECT
+    c9,
+    SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
+    SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
+    FROM aggregate_test_100
+    ORDER BY c9
+    LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: aggregate_test_100.c9 ASC NULLS LAST, fetch=5
+    Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2
+      WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+        WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+          TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  SortExec: fetch=5, expr=[c9@0 ASC NULLS LAST]
+    ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@13 as sum1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum2]
+      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+        BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+          SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@8 ASC NULLS LAST]
+            CsvExec: files={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+
+query III
+SELECT c9,
+   SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
+   SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2
+   FROM aggregate_test_100
+   ORDER BY c9
+   LIMIT 5
+----
+28774375 9144476174 9144476174
+63044568 5125627947 5125627947
+141047417 3650978969 3650978969
+141680161 8526017165 8526017165
+145294611 6802765992 6802765992
+
+# test_window_agg_partition_by_set2
+
+query TT
+EXPLAIN SELECT

Review Comment:
   Given the code doesn't seem to depend on the actual window function or bounds (PRECEDING AND UNBOUNDED FOLLOWING etc) I wonder if these additional cases add much additional coverage?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org