You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/06/20 09:16:28 UTC

[arrow-datafusion] branch vectorized_collision updated: Vectorized implementation

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch vectorized_collision
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/vectorized_collision by this push:
     new f0dec5fc73 Vectorized implementation
f0dec5fc73 is described below

commit f0dec5fc73970f61910dda37cf781f48aa1cbd6b
Author: Daniƫl Heres <da...@coralogix.com>
AuthorDate: Tue Jun 20 11:16:20 2023 +0200

    Vectorized implementation
---
 .../core/src/physical_plan/joins/hash_join.rs      | 92 +++++++++++++---------
 1 file changed, 53 insertions(+), 39 deletions(-)

diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs
index 6c2bc95b1b..ad92dc0835 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -25,7 +25,7 @@ use arrow::array::{
     StringArray, TimestampNanosecondArray, UInt16Array, UInt32Array, UInt64Array,
     UInt8Array,
 };
-use arrow::compute::{eq_dyn, take};
+use arrow::compute::{and, eq_dyn, filter, take};
 use arrow::datatypes::{ArrowNativeType, DataType};
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
@@ -43,6 +43,7 @@ use arrow::{
     },
     util::bit_util,
 };
+use arrow_array::cast::downcast_array;
 use futures::{ready, Stream, StreamExt, TryStreamExt};
 use std::fmt;
 use std::mem::size_of;
@@ -726,39 +727,41 @@ pub fn build_equal_condition_join_indices(
     let mut build_indices = UInt64BufferBuilder::new(0);
     let mut probe_indices = UInt32BufferBuilder::new(0);
 
-    let mut to_check: (Vec<usize>, Vec<u64>) = hash_values
-        .iter()
-        .enumerate()
-        .flat_map(|(row, hash_value)| {
-            build_hashmap
-                .map
-                .get(*hash_value, |(hash, _)| *hash_value == *hash)
-                .map(|(_, v)| (row, *v - 1))
-        })
-        .unzip();
-
-    while to_check.0.len() > 0 {
-        // Perform column-wise (vectorized) equality check
-
-        let res =
-            equal_rows_arr(to_check.0, &build_join_values, &keys_values, null_equals_null)?;
+    for (row, hash_value) in hash_values.iter().enumerate() {
+        // Get the hash and find it in the build index
 
-        // check next items
-        to_check = to_check
-            .0
-            .iter()
-            .zip(to_check.1)
-            .flat_map(|(row, index)| {
-                let next = build_hashmap.next[index as usize];
-                (next != 0).then(|| (*row, next - 1))
-            })
-            .unzip();
+        // For every item on the build and probe we check if it matches
+        // This possibly contains rows with hash collisions,
+        // So we have to check here whether rows are equal or not
+        if let Some((_, index)) = build_hashmap
+            .map
+            .get(*hash_value, |(hash, _)| *hash_value == *hash)
+        {
+            let mut i = *index - 1;
+            loop {
+                build_indices.append(i);
+                probe_indices.append(row as u32);
+                // Follow the chain to get the next index value
+                let next = build_hashmap.next[i as usize];
+                if next == 0 {
+                    // end of list
+                    break;
+                }
+                i = next - 1;
+            }
+        }
     }
 
-    Ok((
-        PrimitiveArray::new(build_indices.finish().into(), None),
-        PrimitiveArray::new(probe_indices.finish().into(), None),
-    ))
+    let left: UInt64Array = PrimitiveArray::new(build_indices.finish().into(), None);
+    let right: UInt32Array = PrimitiveArray::new(probe_indices.finish().into(), None);
+
+    equal_rows_arr(
+        left,
+        right,
+        &build_join_values,
+        &keys_values,
+        null_equals_null,
+    )
 }
 
 macro_rules! equal_rows_elem {
@@ -1094,18 +1097,29 @@ pub fn equal_rows(
 }
 
 pub fn equal_rows_arr(
-    indices: Vec<u64>,
-    indices_right: Vec<usize>,
+    indices: UInt64Array,
+    indices_right: UInt32Array,
     left_arrays: &[ArrayRef],
     right_arrays: &[ArrayRef],
-    null_equals_null: bool,
-) -> Result<BooleanArray> {
-    let arr_left = take(left_arrays[0].as_ref(), indices.into(), None);
-    let arr_right = take(left_arrays[0].as_ref(), indices, None);
+    _null_equals_null: bool,
+) -> Result<(UInt64Array, UInt32Array)> {
+    let arr_left = take(left_arrays[0].as_ref(), &indices, None)?;
+    let arr_right = take(right_arrays[0].as_ref(), &indices_right, None)?;
+
+    let mut equal = eq_dyn(arr_left.as_ref(), arr_right.as_ref())?;
+
+    for i in 1..left_arrays.len() {
+        let equal2 = eq_dyn(left_arrays[i].as_ref(), right_arrays[i].as_ref())?;
+        equal = and(&equal, &equal2)?;
+    }
 
-    let equal = eq_dyn(left_arrays[0].as_ref(), right_arrays[0].as_ref())?;
+    let left_filtered = filter(&indices, &equal)?;
+    let right_filtered = filter(&indices_right, &equal)?;
 
-    Ok(equal)
+    Ok((
+        downcast_array(left_filtered.as_ref()),
+        downcast_array(right_filtered.as_ref()),
+    ))
 }
 
 impl HashJoinStream {