You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/02/11 13:19:17 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #5228: Feature/sort enforcement refactor

alamb commented on code in PR #5228:
URL: https://github.com/apache/arrow-datafusion/pull/5228#discussion_r1103625562


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -1776,6 +1745,47 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_coalesce_propagate() -> Result<()> {
+        let schema = create_test_schema()?;
+        let source = memory_exec(&schema);
+        let repartition = repartition_exec(source);
+        let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(repartition));
+        let repartition = repartition_exec(coalesce_partitions);
+        let sort_exprs = vec![sort_expr("nullable_col", &schema)];
+        // Add local sort
+        let sort = Arc::new(SortExec::new_with_partitioning(
+            sort_exprs.clone(),
+            repartition,
+            true,
+            None,
+        )) as _;
+        let spm = sort_preserving_merge_exec(sort_exprs.clone(), sort);
+        let sort = sort_exec(sort_exprs, spm);
+
+        let physical_plan = sort.clone();
+        // Sort Parallelize rule should end Coalesce + Sort linkage when Sort is Global Sort
+        // Also input plan is not valid as it is. We need to add SortExec before SortPreservingMergeExec.
+        let expected_input = vec![
+            "SortExec: [nullable_col@0 ASC]",
+            "  SortPreservingMergeExec: [nullable_col@0 ASC]",
+            "    SortExec: [nullable_col@0 ASC]",
+            "      RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
+            "        CoalescePartitionsExec",
+            "          RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=0",
+            "            MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+        let expected_optimized = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC]",
+            "  SortExec: [nullable_col@0 ASC]",
+            "    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
+            "      RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=0",
+            "        MemoryExec: partitions=0, partition_sizes=[]",

Review Comment:
   This plan definitely looks better than the input. 



##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -1718,8 +1687,8 @@ mod tests {
         ];
         let expected_optimized = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
-            "  FilterExec: NOT non_nullable_col@1",
-            "    SortExec: [nullable_col@0 ASC]",
+            "  SortExec: [nullable_col@0 ASC]",
+            "    FilterExec: NOT non_nullable_col@1",

Review Comment:
   This plan change looks better to me as well (do filtering before sort)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org