You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/06/28 18:43:52 UTC
[arrow-datafusion] 01/01: Bucketed hash join
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch bucketing
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
commit 6aad5e2790a9339361ea3ac3b4afb8aa7a396fb8
Author: Daniƫl Heres <da...@coralogix.com>
AuthorDate: Wed Jun 28 20:43:40 2023 +0200
Bucketed hash join
---
.../core/src/physical_plan/joins/hash_join.rs | 70 +++++++---------------
.../src/physical_plan/joins/hash_join_utils.rs | 5 +-
testing | 2 +-
3 files changed, 27 insertions(+), 50 deletions(-)
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs
index a3c553c9b3..672c7dedd6 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -571,26 +571,17 @@ pub fn update_hash(
let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?;
// insert hashes to key of the hashmap
- for (row, hash_value) in hash_values.iter().enumerate() {
- let item = hash_map
- .map
- .get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
- if let Some((_, index)) = item {
- // Already exists: add index to next array
- let prev_index = *index;
- // Store new value inside hashmap
- *index = (row + offset + 1) as u64;
+ if hash_map.map.len() > 0 {
+ for (row, hash_value) in hash_values.iter().enumerate() {
+ let bucket = (*hash_value % (hash_map.map.len() as u64)) as usize;
+
+ let prev_index = hash_map.map[bucket];
+
+ hash_map.map[bucket] = (row + offset + 1) as u64;
+
// Update chained Vec at row + offset with previous value
hash_map.next[row + offset] = prev_index;
- } else {
- hash_map.map.insert(
- *hash_value,
- // store the value + 1 as 0 value reserved for end of list
- (*hash_value, (row + offset + 1) as u64),
- |(hash, _)| *hash,
- );
- // chained list at (row + offset) is already initialized with 0
- // meaning end of list
+
}
}
Ok(())
@@ -696,27 +687,22 @@ pub fn build_equal_condition_join_indices(
let mut build_indices = UInt64BufferBuilder::new(0);
let mut probe_indices = UInt32BufferBuilder::new(0);
// Visit all of the probe rows
- for (row, hash_value) in hash_values.iter().enumerate() {
- // Get the hash and find it in the build index
-
- // For every item on the build and probe we check if it matches
- // This possibly contains rows with hash collisions,
- // So we have to check here whether rows are equal or not
- if let Some((_, index)) = build_hashmap
- .map
- .get(*hash_value, |(hash, _)| *hash_value == *hash)
- {
- let mut i = *index - 1;
- loop {
+ if build_hashmap.map.len() > 0 {
+ for (row, hash_value) in hash_values.iter().enumerate() {
+ // Get the hash and find it in the build index
+
+ let bucket = (*hash_value % (build_hashmap.map.len() as u64)) as usize;
+
+ // For every item on the build and probe we check if it matches
+ // This possibly contains rows with hash collisions,
+ // So we have to check here whether rows are equal or not
+ let mut next = build_hashmap.map[bucket];
+ while next != 0 {
+ let i = next - 1;
build_indices.append(i);
probe_indices.append(row as u32);
// Follow the chain to get the next index value
- let next = build_hashmap.next[i as usize];
- if next == 0 {
- // end of list
- break;
- }
- i = next - 1;
+ next = build_hashmap.next[i as usize];
}
}
}
@@ -1310,7 +1296,6 @@ mod tests {
use datafusion_common::ScalarValue;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::Literal;
- use hashbrown::raw::RawTable;
use crate::execution::context::SessionConfig;
use crate::physical_expr::expressions::BinaryExpr;
@@ -1321,7 +1306,6 @@ mod tests {
physical_plan::{
common,
expressions::Column,
- hash_utils::create_hashes,
joins::{hash_join::build_equal_condition_join_indices, utils::JoinSide},
memory::MemoryExec,
repartition::RepartitionExec,
@@ -2651,7 +2635,6 @@ mod tests {
#[test]
fn join_with_hash_collision() -> Result<()> {
- let mut hashmap_left = RawTable::with_capacity(2);
let left = build_table_i32(
("a", &vec![10, 20]),
("x", &vec![100, 200]),
@@ -2659,13 +2642,6 @@ mod tests {
);
let random_state = RandomState::with_seeds(0, 0, 0, 0);
- let hashes_buff = &mut vec![0; left.num_rows()];
- let hashes =
- create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?;
-
- // Create hash collisions (same hashes)
- hashmap_left.insert(hashes[0], (hashes[0], 1), |(h, _)| *h);
- hashmap_left.insert(hashes[1], (hashes[1], 1), |(h, _)| *h);
let next = vec![2, 0];
@@ -2677,7 +2653,7 @@ mod tests {
let left_data = (
JoinHashMap {
- map: hashmap_left,
+ map: vec![1],
next,
},
left,
diff --git a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
index 1b9cbd543d..b3ac1f392b 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
@@ -91,7 +91,7 @@ use datafusion_common::Result;
// https://github.com/apache/arrow-datafusion/issues/50
pub struct JoinHashMap {
// Stores hash value to first index
- pub map: RawTable<(u64, u64)>,
+ pub map: Vec<u64>,
// Stores indices in chained list data structure
pub next: Vec<u64>,
}
@@ -103,7 +103,8 @@ pub struct SymmetricJoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>);
impl JoinHashMap {
pub(crate) fn with_capacity(capacity: usize) -> Self {
JoinHashMap {
- map: RawTable::with_capacity(capacity),
+ // Overallocate using 2 x the buckets
+ map: vec![0; capacity * 2],
next: vec![0; capacity],
}
}
diff --git a/testing b/testing
index e81d0c6de3..5bab2f264a 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit e81d0c6de35948b3be7984af8e00413b314cde6e
+Subproject commit 5bab2f264a23f5af68f69ea93d24ef1e8e77fc88