You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/11/21 19:41:59 UTC

(arrow-datafusion) branch main updated: [MINOR]: Remove unecessary orderings from the final plan (#8289)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new ffbc6896b0 [MINOR]: Remove unecessary orderings from the final plan (#8289)
ffbc6896b0 is described below

commit ffbc6896b0f4f1b417991d1a13266be10c3f3709
Author: Mustafa Akur <10...@users.noreply.github.com>
AuthorDate: Tue Nov 21 22:41:53 2023 +0300

    [MINOR]: Remove unecessary orderings from the final plan (#8289)
    
    * Remove lost orderings from the final plan
    
    * Improve comments
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <oz...@gmail.com>
---
 .../core/src/physical_optimizer/enforce_sorting.rs |  4 +++-
 datafusion/physical-plan/src/insert.rs             | 23 ++++++++--------------
 datafusion/sqllogictest/test_files/select.slt      | 23 ++++++++++++++++++++++
 3 files changed, 34 insertions(+), 16 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index 2590948d3b..6fec74f608 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -476,7 +476,9 @@ fn ensure_sorting(
                     update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?;
                 }
             }
-            (None, None) => {}
+            (None, None) => {
+                update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?;
+            }
         }
     }
     // For window expressions, we can remove some sorts when we can
diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs
index 4eeb58974a..81cdfd753f 100644
--- a/datafusion/physical-plan/src/insert.rs
+++ b/datafusion/physical-plan/src/insert.rs
@@ -219,24 +219,17 @@ impl ExecutionPlan for FileSinkExec {
     }
 
     fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
-        // The input order is either exlicitly set (such as by a ListingTable),
-        // or require that the [FileSinkExec] gets the data in the order the
-        // input produced it (otherwise the optimizer may chose to reorder
-        // the input which could result in unintended / poor UX)
-        //
-        // More rationale:
-        // https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178
-        match &self.sort_order {
-            Some(requirements) => vec![Some(requirements.clone())],
-            None => vec![self
-                .input
-                .output_ordering()
-                .map(PhysicalSortRequirement::from_sort_exprs)],
-        }
+        // The required input ordering is set externally (e.g. by a `ListingTable`).
+        // Otherwise, there is no specific requirement (i.e. `sort_expr` is `None`).
+        vec![self.sort_order.as_ref().cloned()]
     }
 
     fn maintains_input_order(&self) -> Vec<bool> {
-        vec![false]
+        // Maintains ordering in the sense that the written file will reflect
+        // the ordering of the input. For more context, see:
+        //
+        // https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178
+        vec![true]
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/sqllogictest/test_files/select.slt b/datafusion/sqllogictest/test_files/select.slt
index 98ea061c73..bb81c5a9a1 100644
--- a/datafusion/sqllogictest/test_files/select.slt
+++ b/datafusion/sqllogictest/test_files/select.slt
@@ -1013,6 +1013,29 @@ SortPreservingMergeExec: [c@3 ASC NULLS LAST]
 --------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
 ----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true
 
+# When ordering lost during projection, we shouldn't keep the SortExec.
+# in the final physical plan.
+query TT
+EXPLAIN SELECT c2, COUNT(*)
+FROM (SELECT c2
+FROM aggregate_test_100
+ORDER BY c1, c2)
+GROUP BY c2;
+----
+logical_plan
+Aggregate: groupBy=[[aggregate_test_100.c2]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]]
+--Projection: aggregate_test_100.c2
+----Sort: aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST
+------Projection: aggregate_test_100.c2, aggregate_test_100.c1
+--------TableScan: aggregate_test_100 projection=[c1, c2]
+physical_plan
+AggregateExec: mode=FinalPartitioned, gby=[c2@0 as c2], aggr=[COUNT(*)]
+--CoalesceBatchesExec: target_batch_size=8192
+----RepartitionExec: partitioning=Hash([c2@0], 2), input_partitions=2
+------AggregateExec: mode=Partial, gby=[c2@0 as c2], aggr=[COUNT(*)]
+--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2], has_header=true
+
 statement ok
 drop table annotated_data_finite2;