You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "mingmwang (via GitHub)" <gi...@apache.org> on 2023/02/07 12:17:53 UTC

[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5171: Make EnforceSorting global sort aware, fix sort mis-optimizations involving unions, support parallel sort + merge transformations

mingmwang commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1098584451


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -775,6 +1187,133 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_remove_unnecessary_sort2() -> Result<()> {
+        let schema = create_test_schema()?;
+        let source = memory_exec(&schema);
+        let sort_exprs = vec![sort_expr("non_nullable_col", &schema)];
+        let sort = sort_exec(sort_exprs.clone(), source);
+        let spm = sort_preserving_merge_exec(sort_exprs, sort);
+
+        let sort_exprs = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let sort2 = sort_exec(sort_exprs.clone(), spm);
+        let spm2 = sort_preserving_merge_exec(sort_exprs, sort2);
+
+        let sort_exprs = vec![sort_expr("nullable_col", &schema)];
+        let sort3 = sort_exec(sort_exprs, spm2);
+        let physical_plan = repartition_exec(repartition_exec(sort3));
+
+        let expected_input = vec![
+            "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
+            "  RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
+            "    SortExec: [nullable_col@0 ASC]",
+            "      SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "        SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "          SortPreservingMergeExec: [non_nullable_col@1 ASC]",
+            "            SortExec: [non_nullable_col@1 ASC]",
+            "              MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+
+        let expected_optimized = vec![
+            "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
+            "  RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=0",
+            "    MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_remove_unnecessary_sort3() -> Result<()> {
+        let schema = create_test_schema()?;
+        let source = memory_exec(&schema);
+        let sort_exprs = vec![sort_expr("non_nullable_col", &schema)];
+        let sort = sort_exec(sort_exprs.clone(), source);
+        let spm = sort_preserving_merge_exec(sort_exprs, sort);
+
+        let sort_exprs = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let repartition_exec = repartition_exec(spm);
+        let sort2 = sort_exec(sort_exprs.clone(), repartition_exec);
+        let spm2 = sort_preserving_merge_exec(sort_exprs, sort2);
+
+        let physical_plan = aggregate_exec(spm2);
+
+        // When removing a `SortPreservingMergeExec`, make sure that partitioning
+        // requirements are not violated. In some cases, we may need to replace
+        // it with a `CoalescePartitionsExec` instead of directly removing it.
+        let expected_input = vec![
+            "AggregateExec: mode=Final, gby=[], aggr=[]",
+            "  SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "    SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "      RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
+            "        SortPreservingMergeExec: [non_nullable_col@1 ASC]",
+            "          SortExec: [non_nullable_col@1 ASC]",
+            "            MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+
+        let expected_optimized = vec![
+            "AggregateExec: mode=Final, gby=[], aggr=[]",
+            "  CoalescePartitionsExec",
+            "    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=0",
+            "      MemoryExec: partitions=0, partition_sizes=[]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_do_not_remove_sort_with_limit() -> Result<()> {
+        let schema = create_test_schema()?;
+
+        let source1 = parquet_exec(&schema);
+        let sort_exprs = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let sort = sort_exec(sort_exprs.clone(), source1);
+        let limit = local_limit_exec(sort);
+        let limit = global_limit_exec(limit);
+
+        let parquet_sort_exprs = vec![sort_expr("nullable_col", &schema)];
+        let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs);
+
+        let union = union_exec(vec![source2, limit]);
+        let repartition = repartition_exec(union);
+        let physical_plan = sort_preserving_merge_exec(sort_exprs, repartition);
+
+        let expected_input = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "  RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
+            "    UnionExec",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "      GlobalLimitExec: skip=0, fetch=100",
+            "        LocalLimitExec: fetch=100",
+            "          SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "            ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+        ];
+
+        // We should keep the bottom `SortExec`.
+        let expected_optimized = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "  SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
+            "      UnionExec",
+            "        ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
+            "        GlobalLimitExec: skip=0, fetch=100",
+            "          LocalLimitExec: fetch=100",
+            "            SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "              ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
     #[tokio::test]

Review Comment:
   Based on the current bottom-up approach, I think it is quite hard to separate them or there will be duplicate code as well. At the very beginning, they were separated actually, local sort adding was done in the original `BasicEnforcement` rule and sort removing/optimization was done in this rule.  
   
   I had almost finished a top-down version of `EnforceSorting` rule, some UT results are failed and not consistent with the current UT results, not sure which one is the expected, need to check further. I believe top-down is more safe and powerful, it can handle the case like below and only a Sort('a', 'b', 'c') will be added and do not need to remove redundant/unnecessary Sort added during the process .
   
   ```
   Top plan required('a', 'b', 'c')
        Middle plan required('a')
           Bottom plan required('a', 'b') 
   ```
   With the top-down approach, it is also safe to handle the Window expr's reverse ordering optimization and it prefers to keep the most top sort requirements and will not break the final plan's output ordering. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org