You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/06/28 18:43:52 UTC

[arrow-datafusion] 01/01: Bucketed hash join

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch bucketing
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 6aad5e2790a9339361ea3ac3b4afb8aa7a396fb8
Author: Daniƫl Heres <da...@coralogix.com>
AuthorDate: Wed Jun 28 20:43:40 2023 +0200

    Bucketed hash join
---
 .../core/src/physical_plan/joins/hash_join.rs      | 70 +++++++---------------
 .../src/physical_plan/joins/hash_join_utils.rs     |  5 +-
 testing                                            |  2 +-
 3 files changed, 27 insertions(+), 50 deletions(-)

diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs
index a3c553c9b3..672c7dedd6 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -571,26 +571,17 @@ pub fn update_hash(
     let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?;
 
     // insert hashes to key of the hashmap
-    for (row, hash_value) in hash_values.iter().enumerate() {
-        let item = hash_map
-            .map
-            .get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
-        if let Some((_, index)) = item {
-            // Already exists: add index to next array
-            let prev_index = *index;
-            // Store new value inside hashmap
-            *index = (row + offset + 1) as u64;
+    if hash_map.map.len() > 0 {
+        for (row, hash_value) in hash_values.iter().enumerate() {
+            let bucket = (*hash_value % (hash_map.map.len() as u64)) as usize;
+
+            let prev_index = hash_map.map[bucket];
+
+            hash_map.map[bucket] = (row + offset + 1) as u64;
+            
             // Update chained Vec at row + offset with previous value
             hash_map.next[row + offset] = prev_index;
-        } else {
-            hash_map.map.insert(
-                *hash_value,
-                // store the value + 1 as 0 value reserved for end of list
-                (*hash_value, (row + offset + 1) as u64),
-                |(hash, _)| *hash,
-            );
-            // chained list at (row + offset) is already initialized with 0
-            // meaning end of list
+
         }
     }
     Ok(())
@@ -696,27 +687,22 @@ pub fn build_equal_condition_join_indices(
     let mut build_indices = UInt64BufferBuilder::new(0);
     let mut probe_indices = UInt32BufferBuilder::new(0);
     // Visit all of the probe rows
-    for (row, hash_value) in hash_values.iter().enumerate() {
-        // Get the hash and find it in the build index
-
-        // For every item on the build and probe we check if it matches
-        // This possibly contains rows with hash collisions,
-        // So we have to check here whether rows are equal or not
-        if let Some((_, index)) = build_hashmap
-            .map
-            .get(*hash_value, |(hash, _)| *hash_value == *hash)
-        {
-            let mut i = *index - 1;
-            loop {
+    if build_hashmap.map.len() > 0 {
+        for (row, hash_value) in hash_values.iter().enumerate() {
+            // Get the hash and find it in the build index
+
+            let bucket =  (*hash_value % (build_hashmap.map.len() as u64)) as usize;
+
+            // For every item on the build and probe we check if it matches
+            // This possibly contains rows with hash collisions,
+            // So we have to check here whether rows are equal or not
+            let mut next = build_hashmap.map[bucket];
+            while next != 0 {
+                let i = next - 1;
                 build_indices.append(i);
                 probe_indices.append(row as u32);
                 // Follow the chain to get the next index value
-                let next = build_hashmap.next[i as usize];
-                if next == 0 {
-                    // end of list
-                    break;
-                }
-                i = next - 1;
+                next = build_hashmap.next[i as usize];
             }
         }
     }
@@ -1310,7 +1296,6 @@ mod tests {
     use datafusion_common::ScalarValue;
     use datafusion_expr::Operator;
     use datafusion_physical_expr::expressions::Literal;
-    use hashbrown::raw::RawTable;
 
     use crate::execution::context::SessionConfig;
     use crate::physical_expr::expressions::BinaryExpr;
@@ -1321,7 +1306,6 @@ mod tests {
         physical_plan::{
             common,
             expressions::Column,
-            hash_utils::create_hashes,
             joins::{hash_join::build_equal_condition_join_indices, utils::JoinSide},
             memory::MemoryExec,
             repartition::RepartitionExec,
@@ -2651,7 +2635,6 @@ mod tests {
 
     #[test]
     fn join_with_hash_collision() -> Result<()> {
-        let mut hashmap_left = RawTable::with_capacity(2);
         let left = build_table_i32(
             ("a", &vec![10, 20]),
             ("x", &vec![100, 200]),
@@ -2659,13 +2642,6 @@ mod tests {
         );
 
         let random_state = RandomState::with_seeds(0, 0, 0, 0);
-        let hashes_buff = &mut vec![0; left.num_rows()];
-        let hashes =
-            create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?;
-
-        // Create hash collisions (same hashes)
-        hashmap_left.insert(hashes[0], (hashes[0], 1), |(h, _)| *h);
-        hashmap_left.insert(hashes[1], (hashes[1], 1), |(h, _)| *h);
 
         let next = vec![2, 0];
 
@@ -2677,7 +2653,7 @@ mod tests {
 
         let left_data = (
             JoinHashMap {
-                map: hashmap_left,
+                map: vec![1],
                 next,
             },
             left,
diff --git a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
index 1b9cbd543d..b3ac1f392b 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
@@ -91,7 +91,7 @@ use datafusion_common::Result;
 // https://github.com/apache/arrow-datafusion/issues/50
 pub struct JoinHashMap {
     // Stores hash value to first index
-    pub map: RawTable<(u64, u64)>,
+    pub map: Vec<u64>,
     // Stores indices in chained list data structure
     pub next: Vec<u64>,
 }
@@ -103,7 +103,8 @@ pub struct SymmetricJoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>);
 impl JoinHashMap {
     pub(crate) fn with_capacity(capacity: usize) -> Self {
         JoinHashMap {
-            map: RawTable::with_capacity(capacity),
+            // Overallocate using 2 x the buckets
+            map: vec![0; capacity * 2],
             next: vec![0; capacity],
         }
     }
diff --git a/testing b/testing
index e81d0c6de3..5bab2f264a 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit e81d0c6de35948b3be7984af8e00413b314cde6e
+Subproject commit 5bab2f264a23f5af68f69ea93d24ef1e8e77fc88