You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/06/15 14:45:16 UTC

[arrow-datafusion] branch adapt_datastructure created (now 763c24d6c8)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a change to branch adapt_datastructure
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


      at 763c24d6c8 Change HashJoin datastructure

This branch includes the following new commits:

     new 763c24d6c8 Change HashJoin datastructure

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[arrow-datafusion] 01/01: Change HashJoin datastructure

Posted by dh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch adapt_datastructure
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 763c24d6c8658809ebba724e9835e3efda9862b5
Author: Daniƫl Heres <da...@coralogix.com>
AuthorDate: Thu Jun 15 16:45:05 2023 +0200

    Change HashJoin datastructure
---
 .../core/src/physical_plan/joins/hash_join.rs      | 41 ++++++++++++++--------
 .../src/physical_plan/joins/hash_join_utils.rs     | 18 ++++++----
 .../src/physical_plan/joins/symmetric_hash_join.rs | 28 +++++++++++++--
 3 files changed, 62 insertions(+), 25 deletions(-)

diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs
index 0e62540d6d..f097e913d8 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -43,8 +43,6 @@ use arrow::{
     util::bit_util,
 };
 use futures::{ready, Stream, StreamExt, TryStreamExt};
-use hashbrown::raw::RawTable;
-use smallvec::smallvec;
 use std::fmt;
 use std::sync::Arc;
 use std::task::Poll;
@@ -518,9 +516,9 @@ async fn collect_left_input(
     reservation.try_grow(estimated_hastable_size)?;
     metrics.build_mem_used.add(estimated_hastable_size);
 
-    let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows));
+    let mut hashmap = JoinHashMap::with_capacity(num_rows);
     let mut hashes_buffer = Vec::new();
-    let mut offset = 0;
+    let mut offset = 1;
     for batch in batches.iter() {
         hashes_buffer.clear();
         hashes_buffer.resize(batch.num_rows(), 0);
@@ -565,14 +563,20 @@ pub fn update_hash(
         let item = hash_map
             .0
             .get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
-        if let Some((_, indices)) = item {
-            indices.push((row + offset) as u64);
+        if let Some((_, index)) = item {
+            // Already exists: add index to next array
+            let prev_index = *index;
+            *index = (row + offset) as u64;
+            // update chained Vec
+            hash_map.1[*index as usize] = prev_index;
+
         } else {
             hash_map.0.insert(
                 *hash_value,
-                (*hash_value, smallvec![(row + offset) as u64]),
+                (*hash_value, (row + offset) as u64),
                 |(hash, _)| *hash,
             );
+            // chained list is initalized with 0
         }
     }
     Ok(())
@@ -727,13 +731,13 @@ pub fn build_equal_condition_join_indices(
         // For every item on the build and probe we check if it matches
         // This possibly contains rows with hash collisions,
         // So we have to check here whether rows are equal or not
-        if let Some((_, indices)) = build_hashmap
+        if let Some((_, index)) = build_hashmap
             .0
             .get(*hash_value, |(hash, _)| *hash_value == *hash)
         {
-            for &i in indices {
-                // Check hash collisions
-                let offset_build_index = i as usize - offset_value;
+            let mut i = *index;
+            loop {
+                let offset_build_index = i as usize - offset_value - 1;
                 // Check hash collisions
                 if equal_rows(
                     offset_build_index,
@@ -745,6 +749,11 @@ pub fn build_equal_condition_join_indices(
                     build_indices.append(offset_build_index as u64);
                     probe_indices.append(row as u32);
                 }
+                if build_hashmap.1[i as usize] != 0 {
+                    i = build_hashmap.1[i as usize];
+                } else {
+                    break;
+                }
             }
         }
     }
@@ -1258,11 +1267,11 @@ mod tests {
 
     use arrow::array::{ArrayRef, Date32Array, Int32Array, UInt32Builder, UInt64Builder};
     use arrow::datatypes::{DataType, Field, Schema};
-    use smallvec::smallvec;
 
     use datafusion_common::ScalarValue;
     use datafusion_expr::Operator;
     use datafusion_physical_expr::expressions::Literal;
+    use hashbrown::raw::RawTable;
 
     use crate::execution::context::SessionConfig;
     use crate::physical_expr::expressions::BinaryExpr;
@@ -2616,8 +2625,10 @@ mod tests {
             create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?;
 
         // Create hash collisions (same hashes)
-        hashmap_left.insert(hashes[0], (hashes[0], smallvec![0, 1]), |(h, _)| *h);
-        hashmap_left.insert(hashes[1], (hashes[1], smallvec![0, 1]), |(h, _)| *h);
+        hashmap_left.insert(hashes[0], (hashes[0], 1), |(h, _)| *h);
+        hashmap_left.insert(hashes[1], (hashes[1], 1), |(h, _)| *h);
+
+        let next = vec![0, 2, 0];
 
         let right = build_table_i32(
             ("a", &vec![10, 20]),
@@ -2625,7 +2636,7 @@ mod tests {
             ("c", &vec![30, 40]),
         );
 
-        let left_data = (JoinHashMap(hashmap_left), left);
+        let left_data = (JoinHashMap(hashmap_left, next), left);
         let (l, r) = build_equal_condition_join_indices(
             &left_data.0,
             &left_data.1,
diff --git a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
index 992de86dfe..59db05164d 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
@@ -30,13 +30,13 @@ use datafusion_physical_expr::intervals::Interval;
 use datafusion_physical_expr::utils::collect_columns;
 use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
 use hashbrown::raw::RawTable;
-use smallvec::SmallVec;
 
 use crate::physical_plan::joins::utils::{JoinFilter, JoinSide};
 use datafusion_common::Result;
 
 // Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value.
-//
+// The indices (values) are stored in a separate chained list based on (index, next).
+// The first item in the list is reserved.
 // Note that the `u64` keys are not stored in the hashmap (hence the `()` as key), but are only used
 // to put the indices in a certain bucket.
 // By allocating a `HashMap` with capacity for *at least* the number of rows for entries at the build side,
@@ -47,9 +47,13 @@ use datafusion_common::Result;
 // but the values don't match. Those are checked in the [equal_rows] macro
 // TODO: speed up collision check and move away from using a hashbrown HashMap
 // https://github.com/apache/arrow-datafusion/issues/50
-pub struct JoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>);
+pub struct JoinHashMap(pub RawTable<(u64, u64)>, pub Vec<u64>);
 
 impl JoinHashMap {
+    pub(crate) fn with_capacity(capacity: usize) -> JoinHashMap {
+        JoinHashMap(RawTable::with_capacity(capacity), vec![0; capacity + 1])
+    } 
+
     /// In this implementation, the scale_factor variable determines how conservative the shrinking strategy is.
     /// The value of scale_factor is set to 4, which means the capacity will be reduced by 25%
     /// when necessary. You can adjust the scale_factor value to achieve the desired
@@ -67,10 +71,11 @@ impl JoinHashMap {
             let new_capacity = (capacity * (scale_factor - 1)) / scale_factor;
             self.0.shrink_to(new_capacity, |(hash, _)| *hash)
         }
+        // todo handle chained list
     }
 
     pub(crate) fn size(&self) -> usize {
-        self.0.allocation_info().1.size()
+        self.0.allocation_info().1.size() + self.1.capacity() * 16 + 16
     }
 }
 
@@ -290,7 +295,6 @@ pub mod tests {
     use datafusion_common::ScalarValue;
     use datafusion_expr::Operator;
     use datafusion_physical_expr::expressions::{binary, cast, col, lit};
-    use smallvec::smallvec;
     use std::sync::Arc;
 
     /// Filter expr for a + b > c + 10 AND a + b < c + 100
@@ -628,14 +632,14 @@ pub mod tests {
     #[test]
     fn test_shrink_if_necessary() {
         let scale_factor = 4;
-        let mut join_hash_map = JoinHashMap(RawTable::with_capacity(100));
+        let mut join_hash_map = JoinHashMap::with_capacity(100);
         let data_size = 2000;
         let deleted_part = 3 * data_size / 4;
         // Add elements to the JoinHashMap
         for hash_value in 0..data_size {
             join_hash_map.0.insert(
                 hash_value,
-                (hash_value, smallvec![hash_value]),
+                (hash_value, hash_value),
                 |(hash, _)| *hash,
             );
         }
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index 7eac619687..80ea9e65dc 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -702,8 +702,30 @@ fn prune_hash_values(
             .0
             .get_mut(*hash_value, |(hash, _)| hash_value == hash)
         {
-            separation_chain.retain(|n| !index_set.contains(n));
-            if separation_chain.is_empty() {
+            let mut size = 0;
+            let mut i = separation_chain;
+            let mut prev_i = i;
+
+            let mut keep = false;
+
+            // TODO
+            // loop {
+            //     if !index_set.contains(i) {
+            //         if !keep {
+            //             *prev_i = i;
+            //         }
+            //         // retain this value
+            //         keep = true;
+            //         size += 1;
+            //     }
+            //     // drop value
+            //     *prev_i = i;
+
+            //     if *i == 0 {
+            //         break;
+            //     }
+            // }
+            if size == 0 {
                 hashmap
                     .0
                     .remove_entry(*hash_value, |(hash, _)| hash_value == hash);
@@ -1076,7 +1098,7 @@ impl OneSideHashJoiner {
             build_side,
             input_buffer: RecordBatch::new_empty(schema),
             on,
-            hashmap: JoinHashMap(RawTable::with_capacity(0)),
+            hashmap: JoinHashMap::with_capacity(0),
             row_hash_values: VecDeque::new(),
             hashes_buffer: vec![],
             visited_rows: HashSet::new(),