You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/06/28 18:43:51 UTC

[arrow-datafusion] branch bucketing created (now 6aad5e2790)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch bucketing
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


      at 6aad5e2790 Bucketed hash join

This branch includes the following new commits:

     new 6aad5e2790 Bucketed hash join

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[arrow-datafusion] 01/01: Bucketed hash join

Posted by dh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch bucketing
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 6aad5e2790a9339361ea3ac3b4afb8aa7a396fb8
Author: Daniƫl Heres <da...@coralogix.com>
AuthorDate: Wed Jun 28 20:43:40 2023 +0200

    Bucketed hash join
---
 .../core/src/physical_plan/joins/hash_join.rs      | 70 +++++++---------------
 .../src/physical_plan/joins/hash_join_utils.rs     |  5 +-
 testing                                            |  2 +-
 3 files changed, 27 insertions(+), 50 deletions(-)

diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs
index a3c553c9b3..672c7dedd6 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -571,26 +571,17 @@ pub fn update_hash(
     let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?;
 
     // insert hashes to key of the hashmap
-    for (row, hash_value) in hash_values.iter().enumerate() {
-        let item = hash_map
-            .map
-            .get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
-        if let Some((_, index)) = item {
-            // Already exists: add index to next array
-            let prev_index = *index;
-            // Store new value inside hashmap
-            *index = (row + offset + 1) as u64;
+    if hash_map.map.len() > 0 {
+        for (row, hash_value) in hash_values.iter().enumerate() {
+            let bucket = (*hash_value % (hash_map.map.len() as u64)) as usize;
+
+            let prev_index = hash_map.map[bucket];
+
+            hash_map.map[bucket] = (row + offset + 1) as u64;
+            
             // Update chained Vec at row + offset with previous value
             hash_map.next[row + offset] = prev_index;
-        } else {
-            hash_map.map.insert(
-                *hash_value,
-                // store the value + 1 as 0 value reserved for end of list
-                (*hash_value, (row + offset + 1) as u64),
-                |(hash, _)| *hash,
-            );
-            // chained list at (row + offset) is already initialized with 0
-            // meaning end of list
+
         }
     }
     Ok(())
@@ -696,27 +687,22 @@ pub fn build_equal_condition_join_indices(
     let mut build_indices = UInt64BufferBuilder::new(0);
     let mut probe_indices = UInt32BufferBuilder::new(0);
     // Visit all of the probe rows
-    for (row, hash_value) in hash_values.iter().enumerate() {
-        // Get the hash and find it in the build index
-
-        // For every item on the build and probe we check if it matches
-        // This possibly contains rows with hash collisions,
-        // So we have to check here whether rows are equal or not
-        if let Some((_, index)) = build_hashmap
-            .map
-            .get(*hash_value, |(hash, _)| *hash_value == *hash)
-        {
-            let mut i = *index - 1;
-            loop {
+    if build_hashmap.map.len() > 0 {
+        for (row, hash_value) in hash_values.iter().enumerate() {
+            // Get the hash and find it in the build index
+
+            let bucket =  (*hash_value % (build_hashmap.map.len() as u64)) as usize;
+
+            // For every item on the build and probe we check if it matches
+            // This possibly contains rows with hash collisions,
+            // So we have to check here whether rows are equal or not
+            let mut next = build_hashmap.map[bucket];
+            while next != 0 {
+                let i = next - 1;
                 build_indices.append(i);
                 probe_indices.append(row as u32);
                 // Follow the chain to get the next index value
-                let next = build_hashmap.next[i as usize];
-                if next == 0 {
-                    // end of list
-                    break;
-                }
-                i = next - 1;
+                next = build_hashmap.next[i as usize];
             }
         }
     }
@@ -1310,7 +1296,6 @@ mod tests {
     use datafusion_common::ScalarValue;
     use datafusion_expr::Operator;
     use datafusion_physical_expr::expressions::Literal;
-    use hashbrown::raw::RawTable;
 
     use crate::execution::context::SessionConfig;
     use crate::physical_expr::expressions::BinaryExpr;
@@ -1321,7 +1306,6 @@ mod tests {
         physical_plan::{
             common,
             expressions::Column,
-            hash_utils::create_hashes,
             joins::{hash_join::build_equal_condition_join_indices, utils::JoinSide},
             memory::MemoryExec,
             repartition::RepartitionExec,
@@ -2651,7 +2635,6 @@ mod tests {
 
     #[test]
     fn join_with_hash_collision() -> Result<()> {
-        let mut hashmap_left = RawTable::with_capacity(2);
         let left = build_table_i32(
             ("a", &vec![10, 20]),
             ("x", &vec![100, 200]),
@@ -2659,13 +2642,6 @@ mod tests {
         );
 
         let random_state = RandomState::with_seeds(0, 0, 0, 0);
-        let hashes_buff = &mut vec![0; left.num_rows()];
-        let hashes =
-            create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?;
-
-        // Create hash collisions (same hashes)
-        hashmap_left.insert(hashes[0], (hashes[0], 1), |(h, _)| *h);
-        hashmap_left.insert(hashes[1], (hashes[1], 1), |(h, _)| *h);
 
         let next = vec![2, 0];
 
@@ -2677,7 +2653,7 @@ mod tests {
 
         let left_data = (
             JoinHashMap {
-                map: hashmap_left,
+                map: vec![1],
                 next,
             },
             left,
diff --git a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
index 1b9cbd543d..b3ac1f392b 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
@@ -91,7 +91,7 @@ use datafusion_common::Result;
 // https://github.com/apache/arrow-datafusion/issues/50
 pub struct JoinHashMap {
     // Stores hash value to first index
-    pub map: RawTable<(u64, u64)>,
+    pub map: Vec<u64>,
     // Stores indices in chained list data structure
     pub next: Vec<u64>,
 }
@@ -103,7 +103,8 @@ pub struct SymmetricJoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>);
 impl JoinHashMap {
     pub(crate) fn with_capacity(capacity: usize) -> Self {
         JoinHashMap {
-            map: RawTable::with_capacity(capacity),
+            // Overallocate using 2 x the buckets
+            map: vec![0; capacity * 2],
             next: vec![0; capacity],
         }
     }
diff --git a/testing b/testing
index e81d0c6de3..5bab2f264a 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit e81d0c6de35948b3be7984af8e00413b314cde6e
+Subproject commit 5bab2f264a23f5af68f69ea93d24ef1e8e77fc88