You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2021/08/07 05:55:08 UTC

[arrow-datafusion] branch master updated: Use `RawTable` API in hash join (#827)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c0c062  Use `RawTable`  API in hash join (#827)
2c0c062 is described below

commit 2c0c06248667bfeb9c56a4c2119b3a7994b9fc1f
Author: Daniƫl Heres <da...@gmail.com>
AuthorDate: Sat Aug 7 07:55:04 2021 +0200

    Use `RawTable`  API in hash join (#827)
    
    * Use rawtable API
    
    * Avoid changes
    
    * Check on hash again
    
    * Test fix
---
 datafusion/src/physical_plan/hash_join.rs | 107 ++++++++++--------------------
 1 file changed, 36 insertions(+), 71 deletions(-)

diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs
index 1a174bb..1a57c40 100644
--- a/datafusion/src/physical_plan/hash_join.rs
+++ b/datafusion/src/physical_plan/hash_join.rs
@@ -29,8 +29,8 @@ use arrow::{
     datatypes::{UInt32Type, UInt64Type},
 };
 use smallvec::{smallvec, SmallVec};
+use std::sync::Arc;
 use std::{any::Any, usize};
-use std::{hash::Hasher, sync::Arc};
 use std::{time::Instant, vec};
 
 use async_trait::async_trait;
@@ -49,6 +49,8 @@ use arrow::array::{
     UInt64Array, UInt8Array,
 };
 
+use hashbrown::raw::RawTable;
+
 use super::expressions::Column;
 use super::hash_utils::create_hashes;
 use super::{
@@ -65,6 +67,7 @@ use super::{
 use crate::physical_plan::coalesce_batches::concat_batches;
 use crate::physical_plan::{PhysicalExpr, SQLMetric};
 use log::debug;
+use std::fmt;
 
 // Maps a `u64` hash value based on the left ["on" values] to a list of indices with this key's value.
 //
@@ -78,7 +81,14 @@ use log::debug;
 // but the values don't match. Those are checked in the [equal_rows] macro
 // TODO: speed up collission check and move away from using a hashbrown HashMap
 // https://github.com/apache/arrow-datafusion/issues/50
-type JoinHashMap = HashMap<(), SmallVec<[u64; 1]>, IdHashBuilder>;
+struct JoinHashMap(RawTable<(u64, SmallVec<[u64; 1]>)>);
+
+impl fmt::Debug for JoinHashMap {
+    fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        Ok(())
+    }
+}
+
 type JoinLeftData = Arc<(JoinHashMap, RecordBatch)>;
 
 /// join execution plan executes partitions in parallel and combines them into a set of
@@ -303,10 +313,8 @@ impl ExecutionPlan for HashJoinExec {
                                     Ok(acc)
                                 })
                                 .await?;
-                            let mut hashmap = JoinHashMap::with_capacity_and_hasher(
-                                num_rows,
-                                IdHashBuilder {},
-                            );
+                            let mut hashmap =
+                                JoinHashMap(RawTable::with_capacity(num_rows));
                             let mut hashes_buffer = Vec::new();
                             let mut offset = 0;
                             for batch in batches.iter() {
@@ -358,8 +366,7 @@ impl ExecutionPlan for HashJoinExec {
                             Ok(acc)
                         })
                         .await?;
-                    let mut hashmap =
-                        JoinHashMap::with_capacity_and_hasher(num_rows, IdHashBuilder {});
+                    let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows));
                     let mut hashes_buffer = Vec::new();
                     let mut offset = 0;
                     for batch in batches.iter() {
@@ -460,7 +467,7 @@ impl ExecutionPlan for HashJoinExec {
 fn update_hash(
     on: &[Column],
     batch: &RecordBatch,
-    hash: &mut JoinHashMap,
+    hash_map: &mut JoinHashMap,
     offset: usize,
     random_state: &RandomState,
     hashes_buffer: &mut Vec<u64>,
@@ -476,18 +483,18 @@ fn update_hash(
 
     // insert hashes to key of the hashmap
     for (row, hash_value) in hash_values.iter().enumerate() {
-        match hash.raw_entry_mut().from_hash(*hash_value, |_| true) {
-            hashbrown::hash_map::RawEntryMut::Occupied(mut entry) => {
-                entry.get_mut().push((row + offset) as u64);
-            }
-            hashbrown::hash_map::RawEntryMut::Vacant(entry) => {
-                entry.insert_hashed_nocheck(
-                    *hash_value,
-                    (),
-                    smallvec![(row + offset) as u64],
-                );
-            }
-        };
+        let item = hash_map
+            .0
+            .get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
+        if let Some((_, indices)) = item {
+            indices.push((row + offset) as u64);
+        } else {
+            hash_map.0.insert(
+                *hash_value,
+                (*hash_value, smallvec![(row + offset) as u64]),
+                |(hash, _)| *hash,
+            );
+        }
     }
     Ok(())
 }
@@ -678,7 +685,7 @@ fn build_join_indexes(
                 // This possibly contains rows with hash collisions,
                 // So we have to check here whether rows are equal or not
                 if let Some((_, indices)) =
-                    left.raw_entry().from_hash(*hash_value, |_| true)
+                    left.0.get(*hash_value, |(hash, _)| *hash_value == *hash)
                 {
                     for &i in indices {
                         // Check hash collisions
@@ -710,7 +717,7 @@ fn build_join_indexes(
             // First visit all of the rows
             for (row, hash_value) in hash_values.iter().enumerate() {
                 if let Some((_, indices)) =
-                    left.raw_entry().from_hash(*hash_value, |_| true)
+                    left.0.get(*hash_value, |(hash, _)| *hash_value == *hash)
                 {
                     for &i in indices {
                         // Collision check
@@ -728,7 +735,7 @@ fn build_join_indexes(
             let mut right_indices = UInt32Builder::new(0);
 
             for (row, hash_value) in hash_values.iter().enumerate() {
-                match left.raw_entry().from_hash(*hash_value, |_| true) {
+                match left.0.get(*hash_value, |(hash, _)| *hash_value == *hash) {
                     Some((_, indices)) => {
                         for &i in indices {
                             if equal_rows(
@@ -755,38 +762,6 @@ fn build_join_indexes(
         }
     }
 }
-use core::hash::BuildHasher;
-
-/// `Hasher` that returns the same `u64` value as a hash, to avoid re-hashing
-/// it when inserting/indexing or regrowing the `HashMap`
-struct IdHasher {
-    hash: u64,
-}
-
-impl Hasher for IdHasher {
-    fn finish(&self) -> u64 {
-        self.hash
-    }
-
-    fn write_u64(&mut self, i: u64) {
-        self.hash = i;
-    }
-
-    fn write(&mut self, _bytes: &[u8]) {
-        unreachable!("IdHasher should only be used for u64 keys")
-    }
-}
-
-#[derive(Debug)]
-struct IdHashBuilder {}
-
-impl BuildHasher for IdHashBuilder {
-    type Hasher = IdHasher;
-
-    fn build_hasher(&self) -> Self::Hasher {
-        IdHasher { hash: 0 }
-    }
-}
 
 macro_rules! equal_rows_elem {
     ($array_type:ident, $l: ident, $r: ident, $left: ident, $right: ident) => {{
@@ -1776,7 +1751,7 @@ mod tests {
 
     #[test]
     fn join_with_hash_collision() -> Result<()> {
-        let mut hashmap_left = HashMap::with_capacity_and_hasher(2, IdHashBuilder {});
+        let mut hashmap_left = RawTable::with_capacity(2);
         let left = build_table_i32(
             ("a", &vec![10, 20]),
             ("x", &vec![100, 200]),
@@ -1788,19 +1763,9 @@ mod tests {
         let hashes =
             create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?;
 
-        // Create hash collisions
-        match hashmap_left.raw_entry_mut().from_hash(hashes[0], |_| true) {
-            hashbrown::hash_map::RawEntryMut::Vacant(entry) => {
-                entry.insert_hashed_nocheck(hashes[0], (), smallvec![0, 1])
-            }
-            _ => unreachable!("Hash should not be vacant"),
-        };
-        match hashmap_left.raw_entry_mut().from_hash(hashes[1], |_| true) {
-            hashbrown::hash_map::RawEntryMut::Vacant(entry) => {
-                entry.insert_hashed_nocheck(hashes[1], (), smallvec![0, 1])
-            }
-            _ => unreachable!("Hash should not be vacant"),
-        };
+        // Create hash collisions (same hashes)
+        hashmap_left.insert(hashes[0], (hashes[0], smallvec![0, 1]), |(h, _)| *h);
+        hashmap_left.insert(hashes[1], (hashes[1], smallvec![0, 1]), |(h, _)| *h);
 
         let right = build_table_i32(
             ("a", &vec![10, 20]),
@@ -1808,7 +1773,7 @@ mod tests {
             ("c", &vec![30, 40]),
         );
 
-        let left_data = JoinLeftData::new((hashmap_left, left));
+        let left_data = JoinLeftData::new((JoinHashMap(hashmap_left), left));
         let (l, r) = build_join_indexes(
             &left_data,
             &right,