You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/29 16:40:36 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #4377: Refactor the Hash Join

alamb commented on code in PR #4377:
URL: https://github.com/apache/arrow-datafusion/pull/4377#discussion_r1034995370


##########
datafusion/core/src/physical_plan/joins/hash_join.rs:
##########
@@ -2306,23 +2162,36 @@ mod tests {
         Ok(())
     }
 
+    fn build_semi_anti_left_table() -> Arc<dyn ExecutionPlan> {
+        // just two line match
+        // b1 = 10
+        build_table(
+            ("a1", &vec![1, 3, 5, 7, 9, 11, 13]),
+            ("b1", &vec![1, 3, 5, 7, 8, 8, 10]),
+            ("c1", &vec![10, 30, 50, 70, 90, 110, 130]),
+        )
+    }
+
+    fn build_semi_anti_right_table() -> Arc<dyn ExecutionPlan> {
+        // just two line match
+        // b2 = 10
+        build_table(
+            ("a2", &vec![2, 4, 6, 8, 10, 12]),
+            ("b2", &vec![2, 4, 6, 8, 10, 10]),

Review Comment:
   I recommend changing the order of the inputs so they are no sorted to add additional coverage
   
   For example:
   
   ```suggestion
               ("b2", &vec![8, 10 6, 2, 10, 4]),
   ```
   
   Bonus points for fuzzing and trying several different combinations



##########
datafusion/core/src/physical_plan/joins/hash_join.rs:
##########
@@ -1440,44 +1181,150 @@ fn equal_rows(
     err.unwrap_or(Ok(res))
 }
 
-// Produces a batch for left-side rows that have/have not been matched during the whole join
-fn produce_from_matched(
-    visited_left_side: &BooleanBufferBuilder,
-    schema: &SchemaRef,
-    column_indices: &[ColumnIndex],
-    left_data: &JoinLeftData,
-    unmatched: bool,
-) -> ArrowResult<RecordBatch> {
-    let indices = if unmatched {
-        UInt64Array::from_iter_values(
-            (0..visited_left_side.len())
-                .filter_map(|v| (!visited_left_side.get_bit(v)).then_some(v as u64)),
-        )
+// The input is the matched indices for left and right.
+// Adjust the indices according to the join type
+fn adjust_indices_by_join_type(
+    left_indices: UInt64Array,
+    right_indices: UInt32Array,
+    count_right_batch: usize,
+    join_type: JoinType,
+) -> (UInt64Array, UInt32Array) {
+    match join_type {
+        JoinType::Inner => {
+            // matched
+            (left_indices, right_indices)
+        }
+        JoinType::Left => {
+            // matched
+            (left_indices, right_indices)
+            // unmatched left row will be produced in the end of loop, and it has been set in the left visited bitmap
+        }
+        JoinType::Right | JoinType::Full => {
+            // matched
+            // unmatched right row will be produced in this batch
+            let right_unmatched_indices =
+                get_anti_indices(count_right_batch, &right_indices);
+            // combine the matched and unmatched right result together
+            append_right_indices(left_indices, right_indices, right_unmatched_indices)
+        }
+        JoinType::RightSemi => {
+            // need to remove the duplicated record in the right side
+            let right_indices = get_semi_indices(count_right_batch, &right_indices);
+            // the left_indices will not be used later for the `right semi` join
+            (left_indices, right_indices)
+        }
+        JoinType::RightAnti => {
+            // need to remove the duplicated record in the right side
+            // get the anti index for the right side
+            let right_indices = get_anti_indices(count_right_batch, &right_indices);
+            // the left_indices will not be used later for the `right anti` join
+            (left_indices, right_indices)
+        }
+        JoinType::LeftSemi | JoinType::LeftAnti => {
+            // matched or unmatched left row will be produced in the end of loop
+            // TODO: left semi can be optimized.
+            // When visit the right batch, we can output the matched left row and don't need to wait the end of loop
+            (
+                UInt64Array::from_iter_values(vec![]),
+                UInt32Array::from_iter_values(vec![]),
+            )
+        }
+    }
+}
+
+fn append_right_indices(
+    left_indices: UInt64Array,
+    right_indices: UInt32Array,
+    appended_right_indices: UInt32Array,
+) -> (UInt64Array, UInt32Array) {
+    // left_indices, right_indices and appended_right_indices must not contain the null value
+    if appended_right_indices.is_empty() {
+        (left_indices, right_indices)
     } else {
-        UInt64Array::from_iter_values(
-            (0..visited_left_side.len())
-                .filter_map(|v| (visited_left_side.get_bit(v)).then_some(v as u64)),
-        )
-    };
+        let matched_size = left_indices.len();
+        let unmatched_size = appended_right_indices.len();
+        let total_size = matched_size + unmatched_size;
+        // the new left indices: left_indices + null array
+        // the new right indices: right_indices + appended_right_indices
+        let new_left_indices = (0..total_size)
+            .map(|pos| {
+                if pos < matched_size {
+                    unsafe { Some(left_indices.value_unchecked(pos)) }
+                } else {
+                    None
+                }
+            })
+            .collect::<UInt64Array>();
+        let new_right_indices = (0..total_size)
+            .map(|pos| {
+                if pos < matched_size {
+                    unsafe { Some(right_indices.value_unchecked(pos)) }
+                } else {
+                    unsafe {
+                        Some(appended_right_indices.value_unchecked(pos - matched_size))
+                    }
+                }
+            })
+            .collect::<UInt32Array>();
+        (new_left_indices, new_right_indices)

Review Comment:
   I think you might be able to do this without unsafe and more concisely:
   
   ```suggestion
           let matched_size = left_indices.len();
           let unmatched_size = appended_right_indices.len();
           // the new left indices: left_indices + null array
           // the new right indices: right_indices + appended_right_indices
           let new_left_indices = left_indices
               .iter()
               .chain(std::iter::repeat(None).take(unmatched_size))
               .collect::<UInt64Array>();
           let new_right_indices = right_indices
               .iter()
               .chain(appended_right_indices.iter())
               .collect::<UInt32Array>();
           (new_left_indices, new_right_indices)
   ```



##########
datafusion/core/src/physical_plan/joins/hash_join.rs:
##########
@@ -792,225 +770,55 @@ fn build_join_indexes(
     let hashes_buffer = &mut vec![0; keys_values[0].len()];
     let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?;
     let left = &left_data.0;
+    // Using a buffer builder to avoid slower normal builder
+    let mut left_indices = UInt64BufferBuilder::new(0);
+    let mut right_indices = UInt32BufferBuilder::new(0);
 
-    match join_type {
-        JoinType::Inner | JoinType::LeftSemi | JoinType::LeftAnti => {
-            // Using a buffer builder to avoid slower normal builder
-            let mut left_indices = UInt64BufferBuilder::new(0);
-            let mut right_indices = UInt32BufferBuilder::new(0);
-
-            // Visit all of the right rows
-            for (row, hash_value) in hash_values.iter().enumerate() {
-                // Get the hash and find it in the build index
-
-                // For every item on the left and right we check if it matches
-                // This possibly contains rows with hash collisions,
-                // So we have to check here whether rows are equal or not
-                if let Some((_, indices)) =
-                    left.0.get(*hash_value, |(hash, _)| *hash_value == *hash)
-                {
-                    for &i in indices {
-                        // Check hash collisions
-                        if equal_rows(
-                            i as usize,
-                            row,
-                            &left_join_values,
-                            &keys_values,
-                            *null_equals_null,
-                        )? {
-                            left_indices.append(i);
-                            right_indices.append(row as u32);
-                        }
-                    }
-                }
-            }
-            let left = ArrayData::builder(DataType::UInt64)
-                .len(left_indices.len())
-                .add_buffer(left_indices.finish())
-                .build()
-                .unwrap();
-            let right = ArrayData::builder(DataType::UInt32)
-                .len(right_indices.len())
-                .add_buffer(right_indices.finish())
-                .build()
-                .unwrap();
-
-            Ok((
-                PrimitiveArray::<UInt64Type>::from(left),
-                PrimitiveArray::<UInt32Type>::from(right),
-            ))
-        }
-        JoinType::RightSemi => {

Review Comment:
   ❤️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org