You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/06/15 14:45:17 UTC

[arrow-datafusion] 01/01: Change HashJoin datastructure

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch adapt_datastructure
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 763c24d6c8658809ebba724e9835e3efda9862b5
Author: Daniƫl Heres <da...@coralogix.com>
AuthorDate: Thu Jun 15 16:45:05 2023 +0200

    Change HashJoin datastructure
---
 .../core/src/physical_plan/joins/hash_join.rs      | 41 ++++++++++++++--------
 .../src/physical_plan/joins/hash_join_utils.rs     | 18 ++++++----
 .../src/physical_plan/joins/symmetric_hash_join.rs | 28 +++++++++++++--
 3 files changed, 62 insertions(+), 25 deletions(-)

diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs
index 0e62540d6d..f097e913d8 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -43,8 +43,6 @@ use arrow::{
     util::bit_util,
 };
 use futures::{ready, Stream, StreamExt, TryStreamExt};
-use hashbrown::raw::RawTable;
-use smallvec::smallvec;
 use std::fmt;
 use std::sync::Arc;
 use std::task::Poll;
@@ -518,9 +516,9 @@ async fn collect_left_input(
     reservation.try_grow(estimated_hastable_size)?;
     metrics.build_mem_used.add(estimated_hastable_size);
 
-    let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows));
+    let mut hashmap = JoinHashMap::with_capacity(num_rows);
     let mut hashes_buffer = Vec::new();
-    let mut offset = 0;
+    let mut offset = 1;
     for batch in batches.iter() {
         hashes_buffer.clear();
         hashes_buffer.resize(batch.num_rows(), 0);
@@ -565,14 +563,20 @@ pub fn update_hash(
         let item = hash_map
             .0
             .get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
-        if let Some((_, indices)) = item {
-            indices.push((row + offset) as u64);
+        if let Some((_, index)) = item {
+            // Already exists: add index to next array
+            let prev_index = *index;
+            *index = (row + offset) as u64;
+            // update chained Vec
+            hash_map.1[*index as usize] = prev_index;
+
         } else {
             hash_map.0.insert(
                 *hash_value,
-                (*hash_value, smallvec![(row + offset) as u64]),
+                (*hash_value, (row + offset) as u64),
                 |(hash, _)| *hash,
             );
+            // chained list is initalized with 0
         }
     }
     Ok(())
@@ -727,13 +731,13 @@ pub fn build_equal_condition_join_indices(
         // For every item on the build and probe we check if it matches
         // This possibly contains rows with hash collisions,
         // So we have to check here whether rows are equal or not
-        if let Some((_, indices)) = build_hashmap
+        if let Some((_, index)) = build_hashmap
             .0
             .get(*hash_value, |(hash, _)| *hash_value == *hash)
         {
-            for &i in indices {
-                // Check hash collisions
-                let offset_build_index = i as usize - offset_value;
+            let mut i = *index;
+            loop {
+                let offset_build_index = i as usize - offset_value - 1;
                 // Check hash collisions
                 if equal_rows(
                     offset_build_index,
@@ -745,6 +749,11 @@ pub fn build_equal_condition_join_indices(
                     build_indices.append(offset_build_index as u64);
                     probe_indices.append(row as u32);
                 }
+                if build_hashmap.1[i as usize] != 0 {
+                    i = build_hashmap.1[i as usize];
+                } else {
+                    break;
+                }
             }
         }
     }
@@ -1258,11 +1267,11 @@ mod tests {
 
     use arrow::array::{ArrayRef, Date32Array, Int32Array, UInt32Builder, UInt64Builder};
     use arrow::datatypes::{DataType, Field, Schema};
-    use smallvec::smallvec;
 
     use datafusion_common::ScalarValue;
     use datafusion_expr::Operator;
     use datafusion_physical_expr::expressions::Literal;
+    use hashbrown::raw::RawTable;
 
     use crate::execution::context::SessionConfig;
     use crate::physical_expr::expressions::BinaryExpr;
@@ -2616,8 +2625,10 @@ mod tests {
             create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?;
 
         // Create hash collisions (same hashes)
-        hashmap_left.insert(hashes[0], (hashes[0], smallvec![0, 1]), |(h, _)| *h);
-        hashmap_left.insert(hashes[1], (hashes[1], smallvec![0, 1]), |(h, _)| *h);
+        hashmap_left.insert(hashes[0], (hashes[0], 1), |(h, _)| *h);
+        hashmap_left.insert(hashes[1], (hashes[1], 1), |(h, _)| *h);
+
+        let next = vec![0, 2, 0];
 
         let right = build_table_i32(
             ("a", &vec![10, 20]),
@@ -2625,7 +2636,7 @@ mod tests {
             ("c", &vec![30, 40]),
         );
 
-        let left_data = (JoinHashMap(hashmap_left), left);
+        let left_data = (JoinHashMap(hashmap_left, next), left);
         let (l, r) = build_equal_condition_join_indices(
             &left_data.0,
             &left_data.1,
diff --git a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
index 992de86dfe..59db05164d 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
@@ -30,13 +30,13 @@ use datafusion_physical_expr::intervals::Interval;
 use datafusion_physical_expr::utils::collect_columns;
 use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
 use hashbrown::raw::RawTable;
-use smallvec::SmallVec;
 
 use crate::physical_plan::joins::utils::{JoinFilter, JoinSide};
 use datafusion_common::Result;
 
 // Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value.
-//
+// The indices (values) are stored in a separate chained list based on (index, next).
+// The first item in the list is reserved.
 // Note that the `u64` keys are not stored in the hashmap (hence the `()` as key), but are only used
 // to put the indices in a certain bucket.
 // By allocating a `HashMap` with capacity for *at least* the number of rows for entries at the build side,
@@ -47,9 +47,13 @@ use datafusion_common::Result;
 // but the values don't match. Those are checked in the [equal_rows] macro
 // TODO: speed up collision check and move away from using a hashbrown HashMap
 // https://github.com/apache/arrow-datafusion/issues/50
-pub struct JoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>);
+pub struct JoinHashMap(pub RawTable<(u64, u64)>, pub Vec<u64>);
 
 impl JoinHashMap {
+    pub(crate) fn with_capacity(capacity: usize) -> JoinHashMap {
+        JoinHashMap(RawTable::with_capacity(capacity), vec![0; capacity + 1])
+    } 
+
     /// In this implementation, the scale_factor variable determines how conservative the shrinking strategy is.
     /// The value of scale_factor is set to 4, which means the capacity will be reduced by 25%
     /// when necessary. You can adjust the scale_factor value to achieve the desired
@@ -67,10 +71,11 @@ impl JoinHashMap {
             let new_capacity = (capacity * (scale_factor - 1)) / scale_factor;
             self.0.shrink_to(new_capacity, |(hash, _)| *hash)
         }
+        // todo handle chained list
     }
 
     pub(crate) fn size(&self) -> usize {
-        self.0.allocation_info().1.size()
+        self.0.allocation_info().1.size() + self.1.capacity() * 16 + 16
     }
 }
 
@@ -290,7 +295,6 @@ pub mod tests {
     use datafusion_common::ScalarValue;
     use datafusion_expr::Operator;
     use datafusion_physical_expr::expressions::{binary, cast, col, lit};
-    use smallvec::smallvec;
     use std::sync::Arc;
 
     /// Filter expr for a + b > c + 10 AND a + b < c + 100
@@ -628,14 +632,14 @@ pub mod tests {
     #[test]
     fn test_shrink_if_necessary() {
         let scale_factor = 4;
-        let mut join_hash_map = JoinHashMap(RawTable::with_capacity(100));
+        let mut join_hash_map = JoinHashMap::with_capacity(100);
         let data_size = 2000;
         let deleted_part = 3 * data_size / 4;
         // Add elements to the JoinHashMap
         for hash_value in 0..data_size {
             join_hash_map.0.insert(
                 hash_value,
-                (hash_value, smallvec![hash_value]),
+                (hash_value, hash_value),
                 |(hash, _)| *hash,
             );
         }
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index 7eac619687..80ea9e65dc 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -702,8 +702,30 @@ fn prune_hash_values(
             .0
             .get_mut(*hash_value, |(hash, _)| hash_value == hash)
         {
-            separation_chain.retain(|n| !index_set.contains(n));
-            if separation_chain.is_empty() {
+            let mut size = 0;
+            let mut i = separation_chain;
+            let mut prev_i = i;
+
+            let mut keep = false;
+
+            // TODO
+            // loop {
+            //     if !index_set.contains(i) {
+            //         if !keep {
+            //             *prev_i = i;
+            //         }
+            //         // retain this value
+            //         keep = true;
+            //         size += 1;
+            //     }
+            //     // drop value
+            //     *prev_i = i;
+
+            //     if *i == 0 {
+            //         break;
+            //     }
+            // }
+            if size == 0 {
                 hashmap
                     .0
                     .remove_entry(*hash_value, |(hash, _)| hash_value == hash);
@@ -1076,7 +1098,7 @@ impl OneSideHashJoiner {
             build_side,
             input_buffer: RecordBatch::new_empty(schema),
             on,
-            hashmap: JoinHashMap(RawTable::with_capacity(0)),
+            hashmap: JoinHashMap::with_capacity(0),
             row_hash_values: VecDeque::new(),
             hashes_buffer: vec![],
             visited_rows: HashSet::new(),