You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/15 20:22:54 UTC

[GitHub] [arrow-datafusion] isidentical commented on a diff in pull request #4219: [CBO] JoinSelection Rule, select HashJoin Partition Mode based on the available statistics

isidentical commented on code in PR #4219:
URL: https://github.com/apache/arrow-datafusion/pull/4219#discussion_r1023215447


##########
datafusion/core/src/physical_plan/planner.rs:
##########
@@ -932,15 +933,41 @@ impl DefaultPhysicalPlanner {
 
                     if session_state.config.target_partitions > 1
                         && session_state.config.repartition_joins
+                        && !session_state.config.prefer_hash_join
                     {
-                        // Use hash partition by default to parallelize hash joins
+                        // Use SortMergeJoin if hash join is not preferred
+                        // Sort-Merge join support currently is experimental
+                        if join_filter.is_some() {
+                            // TODO SortMergeJoinExec need to support join filter
+                            Err(DataFusionError::Plan("SortMergeJoinExec does not support join_filter now.".to_string()))

Review Comment:
   ```suggestion
                               Err(DataFusionError::NotImplemented("SortMergeJoinExec does not support join_filter now.".to_string()))
   ```



##########
datafusion/core/src/physical_optimizer/join_selection.rs:
##########
@@ -68,6 +75,19 @@ fn should_swap_join_order(left: &dyn ExecutionPlan, right: &dyn ExecutionPlan) -
     }
 }
 
+fn supports_collect_by_size(
+    plan: &dyn ExecutionPlan,
+    collection_size_threshold: usize,
+) -> bool {
+    if let Some(size) = plan.statistics().total_byte_size {
+        size < collection_size_threshold
+    } else if let Some(row_count) = plan.statistics().num_rows {
+        row_count < collection_size_threshold

Review Comment:
   Question: are we treating both bytes and rows equally here? Seems like `collection_size_threshold` has a unit of `bytes` compared to the `row_count` which specifies number of rows. I guess we can normalize it a bit if we want to pursue this (e.g. `collection_size_threshold / SOME_MAGIC_CONSTANT`) but otherwise it might be a bit off from a desirable scenerio.



##########
datafusion/core/src/physical_plan/joins/sort_merge_join.rs:
##########
@@ -348,7 +349,15 @@ impl ExecutionPlan for SortMergeJoinExec {
     }
 
     fn statistics(&self) -> Statistics {
-        todo!()
+        // TODO stats: it is not possible in general to know the output size of joins
+        // There are some special cases though, for example:
+        // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)`
+        estimate_join_statistics(

Review Comment:
   Really glad that this can also be used in other places 💯 



##########
datafusion/core/src/physical_optimizer/join_selection.rs:
##########
@@ -150,66 +207,139 @@ fn swap_join_filter(filter: &Option<JoinFilter>) -> Option<JoinFilter> {
     }
 }
 
-impl PhysicalOptimizerRule for HashBuildProbeOrder {
+impl PhysicalOptimizerRule for JoinSelection {
     fn optimize(
         &self,
         plan: Arc<dyn ExecutionPlan>,
         session_config: &SessionConfig,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let plan = optimize_children(self, plan, session_config)?;
-        if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
-            let left = hash_join.left();
-            let right = hash_join.right();
+        let collect_left_threshold = session_config.hash_join_collect_left_threshold;
+        plan.transform_up(&|plan| {
+            if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
+                if matches!(hash_join.partition_mode(), PartitionMode::Auto) {
+                    try_collect_left(hash_join, collect_left_threshold)
+                        .unwrap()
+                        .or_else(|| Some(partitioned_hash_join(hash_join).unwrap()))
+                } else {
+                    let left = hash_join.left();
+                    let right = hash_join.right();
+                    if should_swap_join_order(&**left, &**right)
+                        && supports_swap(*hash_join.join_type())
+                    {
+                        Some(
+                            swap_hash_join(
+                                hash_join,
+                                *hash_join.partition_mode(),
+                                left,
+                                right,
+                            )
+                            .unwrap(),
+                        )
+                    } else {
+                        None
+                    }
+                }
+            } else if let Some(cross_join) = plan.as_any().downcast_ref::<CrossJoinExec>()
+            {
+                let left = cross_join.left();
+                let right = cross_join.right();
+                if should_swap_join_order(&**left, &**right) {
+                    let new_join =
+                        CrossJoinExec::try_new(Arc::clone(right), Arc::clone(left))
+                            .unwrap();
+                    // TODO avoid adding ProjectionExec again and again, only adding Final Projection
+                    let proj = ProjectionExec::try_new(
+                        swap_reverting_projection(&left.schema(), &right.schema()),
+                        Arc::new(new_join),
+                    )
+                    .unwrap();
+                    Some(Arc::new(proj))
+                } else {
+                    None
+                }
+            } else {
+                None
+            }
+        })
+    }
+
+    fn name(&self) -> &str {
+        "[CBO]join_selection"
+    }
+}
+
+fn try_collect_left(

Review Comment:
   Would it make sense to describe the logic here for each of the different scenarios (i was a bit lost till the end to figure out each state and how it should behave)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org