You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/04/27 13:05:44 UTC

[GitHub] [arrow-datafusion] alamb commented on a change in pull request #24: [DataFusion] Optimize hash join inner workings, null handling fix

alamb commented on a change in pull request #24:
URL: https://github.com/apache/arrow-datafusion/pull/24#discussion_r621186963



##########
File path: datafusion/src/physical_plan/hash_join.rs
##########
@@ -399,15 +395,23 @@ fn update_hash(
         .map(|name| Ok(col(name).evaluate(batch)?.into_array(batch.num_rows())))
         .collect::<Result<Vec<_>>>()?;
 
-    // update the hash map
+    // calculate the hash values
     let hash_values = create_hashes(&keys_values, &random_state, hashes_buffer)?;
 
     // insert hashes to key of the hashmap
     for (row, hash_value) in hash_values.iter().enumerate() {
-        hash.raw_entry_mut()
-            .from_key_hashed_nocheck(*hash_value, hash_value)
-            .and_modify(|_, v| v.push((row + offset) as u64))
-            .or_insert_with(|| (*hash_value, smallvec![(row + offset) as u64]));
+        match hash.raw_entry_mut().from_hash(*hash_value, |_| true) {
+            hashbrown::hash_map::RawEntryMut::Occupied(mut entry) => {
+                entry.get_mut().push((row + offset) as u64);
+            }
+            hashbrown::hash_map::RawEntryMut::Vacant(entry) => {
+                entry.insert_hashed_nocheck(

Review comment:
       Does this code do the right thing if there are collisions in the hash table (aka the same key column values hash to the same hash value?) I am not enough of an expert on HashBrown to really know, but something feels off to me here. 
   

##########
File path: datafusion/src/physical_plan/hash_join.rs
##########
@@ -65,7 +65,7 @@ use log::debug;
 // Maps a `u64` hash value based on the left ["on" values] to a list of indices with this key's value.
 // E.g. 1 -> [3, 6, 8] indicates that the column values map to rows 3, 6 and 8 for hash value 1
 // As the key is a hash value, we need to check possible hash collisions in the probe stage
-type JoinHashMap = HashMap<u64, SmallVec<[u64; 1]>, IdHashBuilder>;
+type JoinHashMap = HashMap<(), SmallVec<[u64; 1]>, IdHashBuilder>;

Review comment:
       I am a little confused about using `()` as the key type for the `HashMap` -- maybe updating the comments would help

##########
File path: datafusion/src/physical_plan/hash_join.rs
##########
@@ -755,21 +766,75 @@ fn equal_rows(
 }
 
 macro_rules! hash_array {
-    ($array_type:ident, $column: ident, $ty: ident, $hashes: ident, $random_state: ident) => {
+    ($array_type:ident, $column: ident, $ty: ident, $hashes: ident, $random_state: ident, $multi_col: ident) => {
         let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
         if array.null_count() == 0 {
-            for (i, hash) in $hashes.iter_mut().enumerate() {
-                *hash =
-                    combine_hashes($ty::get_hash(&array.value(i), $random_state), *hash);
-            }
-        } else {
-            for (i, hash) in $hashes.iter_mut().enumerate() {
-                if !array.is_null(i) {
+            if $multi_col {
+                for (i, hash) in $hashes.iter_mut().enumerate() {
                     *hash = combine_hashes(
                         $ty::get_hash(&array.value(i), $random_state),
                         *hash,
                     );
                 }
+            } else {
+                for (i, hash) in $hashes.iter_mut().enumerate() {
+                    *hash = $ty::get_hash(&array.value(i), $random_state);
+                }
+            }
+        } else {
+            if $multi_col {
+                for (i, hash) in $hashes.iter_mut().enumerate() {
+                    if !array.is_null(i) {
+                        *hash = combine_hashes(
+                            $ty::get_hash(&array.value(i), $random_state),
+                            *hash,
+                        );
+                    }
+                }
+            } else {
+                for (i, hash) in $hashes.iter_mut().enumerate() {
+                    if !array.is_null(i) {
+                        *hash = $ty::get_hash(&array.value(i), $random_state);
+                    }
+                }
+            }
+        }
+    };
+}
+
+macro_rules! hash_array_primitive {
+    ($array_type:ident, $column: ident, $ty: ident, $hashes: ident, $random_state: ident, $multi_col: ident) => {
+        let array = $column.as_any().downcast_ref::<$array_type>().unwrap();
+        let values = array.values();
+
+        if array.null_count() == 0 {
+            if $multi_col {
+                for (hash, value) in $hashes.iter_mut().zip(values.iter()) {
+                    *hash = combine_hashes($ty::get_hash(value, $random_state), *hash);
+                }
+            } else {
+                for (hash, value) in $hashes.iter_mut().zip(values.iter()) {
+                    *hash = $ty::get_hash(value, $random_state)
+                }
+            }
+        } else {
+            if $multi_col {

Review comment:
       special casing multi-value join columns is a good one

##########
File path: datafusion/src/physical_plan/hash_join.rs
##########
@@ -708,7 +720,6 @@ macro_rules! equal_rows_elem {
         let right_array = $r.as_any().downcast_ref::<$array_type>().unwrap();
 
         match (left_array.is_null($left), left_array.is_null($right)) {
-            (true, true) => true,

Review comment:
       The fact we are now worrying about and fixing null handling is a sign, in my mind, of DataFusion's maturation 🤣 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org