You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/06/20 09:16:28 UTC
[arrow-datafusion] branch vectorized_collision updated: Vectorized implementation
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch vectorized_collision
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/vectorized_collision by this push:
new f0dec5fc73 Vectorized implementation
f0dec5fc73 is described below
commit f0dec5fc73970f61910dda37cf781f48aa1cbd6b
Author: Daniƫl Heres <da...@coralogix.com>
AuthorDate: Tue Jun 20 11:16:20 2023 +0200
Vectorized implementation
---
.../core/src/physical_plan/joins/hash_join.rs | 92 +++++++++++++---------
1 file changed, 53 insertions(+), 39 deletions(-)
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs
index 6c2bc95b1b..ad92dc0835 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -25,7 +25,7 @@ use arrow::array::{
StringArray, TimestampNanosecondArray, UInt16Array, UInt32Array, UInt64Array,
UInt8Array,
};
-use arrow::compute::{eq_dyn, take};
+use arrow::compute::{and, eq_dyn, filter, take};
use arrow::datatypes::{ArrowNativeType, DataType};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
@@ -43,6 +43,7 @@ use arrow::{
},
util::bit_util,
};
+use arrow_array::cast::downcast_array;
use futures::{ready, Stream, StreamExt, TryStreamExt};
use std::fmt;
use std::mem::size_of;
@@ -726,39 +727,41 @@ pub fn build_equal_condition_join_indices(
let mut build_indices = UInt64BufferBuilder::new(0);
let mut probe_indices = UInt32BufferBuilder::new(0);
- let mut to_check: (Vec<usize>, Vec<u64>) = hash_values
- .iter()
- .enumerate()
- .flat_map(|(row, hash_value)| {
- build_hashmap
- .map
- .get(*hash_value, |(hash, _)| *hash_value == *hash)
- .map(|(_, v)| (row, *v - 1))
- })
- .unzip();
-
- while to_check.0.len() > 0 {
- // Perform column-wise (vectorized) equality check
-
- let res =
- equal_rows_arr(to_check.0, &build_join_values, &keys_values, null_equals_null)?;
+ for (row, hash_value) in hash_values.iter().enumerate() {
+ // Get the hash and find it in the build index
- // check next items
- to_check = to_check
- .0
- .iter()
- .zip(to_check.1)
- .flat_map(|(row, index)| {
- let next = build_hashmap.next[index as usize];
- (next != 0).then(|| (*row, next - 1))
- })
- .unzip();
+ // For every item on the build and probe we check if it matches
+ // This possibly contains rows with hash collisions,
+ // So we have to check here whether rows are equal or not
+ if let Some((_, index)) = build_hashmap
+ .map
+ .get(*hash_value, |(hash, _)| *hash_value == *hash)
+ {
+ let mut i = *index - 1;
+ loop {
+ build_indices.append(i);
+ probe_indices.append(row as u32);
+ // Follow the chain to get the next index value
+ let next = build_hashmap.next[i as usize];
+ if next == 0 {
+ // end of list
+ break;
+ }
+ i = next - 1;
+ }
+ }
}
- Ok((
- PrimitiveArray::new(build_indices.finish().into(), None),
- PrimitiveArray::new(probe_indices.finish().into(), None),
- ))
+ let left: UInt64Array = PrimitiveArray::new(build_indices.finish().into(), None);
+ let right: UInt32Array = PrimitiveArray::new(probe_indices.finish().into(), None);
+
+ equal_rows_arr(
+ left,
+ right,
+ &build_join_values,
+ &keys_values,
+ null_equals_null,
+ )
}
macro_rules! equal_rows_elem {
@@ -1094,18 +1097,29 @@ pub fn equal_rows(
}
pub fn equal_rows_arr(
- indices: Vec<u64>,
- indices_right: Vec<usize>,
+ indices: UInt64Array,
+ indices_right: UInt32Array,
left_arrays: &[ArrayRef],
right_arrays: &[ArrayRef],
- null_equals_null: bool,
-) -> Result<BooleanArray> {
- let arr_left = take(left_arrays[0].as_ref(), indices.into(), None);
- let arr_right = take(left_arrays[0].as_ref(), indices, None);
+ _null_equals_null: bool,
+) -> Result<(UInt64Array, UInt32Array)> {
+ let arr_left = take(left_arrays[0].as_ref(), &indices, None)?;
+ let arr_right = take(right_arrays[0].as_ref(), &indices_right, None)?;
+
+ let mut equal = eq_dyn(arr_left.as_ref(), arr_right.as_ref())?;
+
+ for i in 1..left_arrays.len() {
+ let equal2 = eq_dyn(left_arrays[i].as_ref(), right_arrays[i].as_ref())?;
+ equal = and(&equal, &equal2)?;
+ }
- let equal = eq_dyn(left_arrays[0].as_ref(), right_arrays[0].as_ref())?;
+ let left_filtered = filter(&indices, &equal)?;
+ let right_filtered = filter(&indices_right, &equal)?;
- Ok(equal)
+ Ok((
+ downcast_array(left_filtered.as_ref()),
+ downcast_array(right_filtered.as_ref()),
+ ))
}
impl HashJoinStream {