You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/20 10:24:58 UTC

[GitHub] [arrow-datafusion] jackwener commented on a diff in pull request #4219: [CBO] JoinSelection Rule, select HashJoin Partition Mode based on the Join Type and available statistics

jackwener commented on code in PR #4219:
URL: https://github.com/apache/arrow-datafusion/pull/4219#discussion_r1027263477


##########
datafusion/core/src/physical_optimizer/join_selection.rs:
##########
@@ -0,0 +1,972 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utilizing exact statistics from sources to avoid scanning data
+use std::sync::Arc;
+
+use arrow::datatypes::Schema;
+
+use crate::execution::context::SessionConfig;
+use crate::logical_expr::JoinType;
+use crate::physical_plan::expressions::Column;
+use crate::physical_plan::joins::{
+    utils::{ColumnIndex, JoinFilter, JoinSide},
+    CrossJoinExec, HashJoinExec, PartitionMode,
+};
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::{ExecutionPlan, PhysicalExpr};
+
+use super::optimizer::PhysicalOptimizerRule;
+use crate::error::Result;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+
+/// For hash join with the partition mode [PartitionMode::Auto], JoinSelection rule will make
+/// a cost based decision to select which PartitionMode mode(Partitioned/CollectLeft) is optimal
+/// based on the available statistics that the inputs have.
+/// If the statistics information is not available, the partition mode will fall back to [PartitionMode::Partitioned].
+///
+/// JoinSelection rule will also reorder the build and probe phase of the hash joins
+/// based on the avaliable statistics that the inputs have.
+/// The rule optimizes the order such that the left (build) side of the join is the smallest.
+/// If the statistics information is not available, the order stays the same as the original query.
+/// JoinSelection rule will also swap the left and right sides for cross join to keep the left side
+/// is the smallest.
+#[derive(Default)]
+pub struct JoinSelection {}
+
+impl JoinSelection {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+// TODO we need some performance test for Right Semi/Right Join swap to Left Semi/Left Join in case that the right side is smaller but not much smaller.
+// TODO In PrestoSQL, the optimizer flips join sides only if one side is much smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default is is 8 times.
+fn should_swap_join_order(left: &dyn ExecutionPlan, right: &dyn ExecutionPlan) -> bool {
+    // Get the left and right table's total bytes
+    // If both the left and right tables contain total_byte_size statistics,
+    // use `total_byte_size` to determine `should_swap_join_order`, else use `num_rows`
+    let (left_size, right_size) = match (
+        left.statistics().total_byte_size,
+        right.statistics().total_byte_size,
+    ) {
+        (Some(l), Some(r)) => (Some(l), Some(r)),
+        _ => (left.statistics().num_rows, right.statistics().num_rows),
+    };
+
+    match (left_size, right_size) {
+        (Some(l), Some(r)) => l > r,
+        _ => false,
+    }
+}
+
+fn supports_collect_by_size(
+    plan: &dyn ExecutionPlan,
+    collection_size_threshold: usize,
+) -> bool {
+    // Currently we do not trust the 0 value from stats, due to stats collection might have bug
+    // TODO check the logic in datasource::get_statistics_with_limit()
+    if let Some(size) = plan.statistics().total_byte_size {
+        size != 0 && size < collection_size_threshold
+    } else if let Some(row_count) = plan.statistics().num_rows {
+        row_count != 0 && row_count < collection_size_threshold
+    } else {
+        false
+    }
+}
+
+fn supports_swap(join_type: JoinType) -> bool {
+    match join_type {
+        JoinType::Inner
+        | JoinType::Left
+        | JoinType::Right
+        | JoinType::Full
+        | JoinType::LeftSemi
+        | JoinType::RightSemi
+        | JoinType::LeftAnti
+        | JoinType::RightAnti => true,
+    }
+}
+
+fn swap_join_type(join_type: JoinType) -> JoinType {
+    match join_type {
+        JoinType::Inner => JoinType::Inner,
+        JoinType::Full => JoinType::Full,
+        JoinType::Left => JoinType::Right,
+        JoinType::Right => JoinType::Left,
+        JoinType::LeftSemi => JoinType::RightSemi,
+        JoinType::RightSemi => JoinType::LeftSemi,
+        JoinType::LeftAnti => JoinType::RightAnti,
+        JoinType::RightAnti => JoinType::LeftAnti,
+    }
+}
+
+fn swap_hash_join(
+    hash_join: &HashJoinExec,
+    partition_mode: PartitionMode,
+    left: &Arc<dyn ExecutionPlan>,
+    right: &Arc<dyn ExecutionPlan>,
+) -> Result<Arc<dyn ExecutionPlan>> {
+    let new_join = HashJoinExec::try_new(
+        Arc::clone(right),
+        Arc::clone(left),
+        hash_join
+            .on()
+            .iter()
+            .map(|(l, r)| (r.clone(), l.clone()))
+            .collect(),
+        swap_join_filter(hash_join.filter()),
+        &swap_join_type(*hash_join.join_type()),
+        partition_mode,
+        hash_join.null_equals_null(),
+    )?;
+    if matches!(
+        hash_join.join_type(),
+        JoinType::LeftSemi
+            | JoinType::RightSemi
+            | JoinType::LeftAnti
+            | JoinType::RightAnti
+    ) {
+        Ok(Arc::new(new_join))
+    } else {
+        // TODO avoid adding ProjectionExec again and again, only adding Final Projection

Review Comment:
   so careful! I also meet this problem in Doris.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org