You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2021/08/07 05:55:08 UTC
[arrow-datafusion] branch master updated: Use `RawTable` API in
hash join (#827)
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 2c0c062 Use `RawTable` API in hash join (#827)
2c0c062 is described below
commit 2c0c06248667bfeb9c56a4c2119b3a7994b9fc1f
Author: Daniƫl Heres <da...@gmail.com>
AuthorDate: Sat Aug 7 07:55:04 2021 +0200
Use `RawTable` API in hash join (#827)
* Use rawtable API
* Avoid changes
* Check on hash again
* Test fix
---
datafusion/src/physical_plan/hash_join.rs | 107 ++++++++++--------------------
1 file changed, 36 insertions(+), 71 deletions(-)
diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs
index 1a174bb..1a57c40 100644
--- a/datafusion/src/physical_plan/hash_join.rs
+++ b/datafusion/src/physical_plan/hash_join.rs
@@ -29,8 +29,8 @@ use arrow::{
datatypes::{UInt32Type, UInt64Type},
};
use smallvec::{smallvec, SmallVec};
+use std::sync::Arc;
use std::{any::Any, usize};
-use std::{hash::Hasher, sync::Arc};
use std::{time::Instant, vec};
use async_trait::async_trait;
@@ -49,6 +49,8 @@ use arrow::array::{
UInt64Array, UInt8Array,
};
+use hashbrown::raw::RawTable;
+
use super::expressions::Column;
use super::hash_utils::create_hashes;
use super::{
@@ -65,6 +67,7 @@ use super::{
use crate::physical_plan::coalesce_batches::concat_batches;
use crate::physical_plan::{PhysicalExpr, SQLMetric};
use log::debug;
+use std::fmt;
// Maps a `u64` hash value based on the left ["on" values] to a list of indices with this key's value.
//
@@ -78,7 +81,14 @@ use log::debug;
// but the values don't match. Those are checked in the [equal_rows] macro
// TODO: speed up collission check and move away from using a hashbrown HashMap
// https://github.com/apache/arrow-datafusion/issues/50
-type JoinHashMap = HashMap<(), SmallVec<[u64; 1]>, IdHashBuilder>;
+struct JoinHashMap(RawTable<(u64, SmallVec<[u64; 1]>)>);
+
+impl fmt::Debug for JoinHashMap {
+ fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ Ok(())
+ }
+}
+
type JoinLeftData = Arc<(JoinHashMap, RecordBatch)>;
/// join execution plan executes partitions in parallel and combines them into a set of
@@ -303,10 +313,8 @@ impl ExecutionPlan for HashJoinExec {
Ok(acc)
})
.await?;
- let mut hashmap = JoinHashMap::with_capacity_and_hasher(
- num_rows,
- IdHashBuilder {},
- );
+ let mut hashmap =
+ JoinHashMap(RawTable::with_capacity(num_rows));
let mut hashes_buffer = Vec::new();
let mut offset = 0;
for batch in batches.iter() {
@@ -358,8 +366,7 @@ impl ExecutionPlan for HashJoinExec {
Ok(acc)
})
.await?;
- let mut hashmap =
- JoinHashMap::with_capacity_and_hasher(num_rows, IdHashBuilder {});
+ let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows));
let mut hashes_buffer = Vec::new();
let mut offset = 0;
for batch in batches.iter() {
@@ -460,7 +467,7 @@ impl ExecutionPlan for HashJoinExec {
fn update_hash(
on: &[Column],
batch: &RecordBatch,
- hash: &mut JoinHashMap,
+ hash_map: &mut JoinHashMap,
offset: usize,
random_state: &RandomState,
hashes_buffer: &mut Vec<u64>,
@@ -476,18 +483,18 @@ fn update_hash(
// insert hashes to key of the hashmap
for (row, hash_value) in hash_values.iter().enumerate() {
- match hash.raw_entry_mut().from_hash(*hash_value, |_| true) {
- hashbrown::hash_map::RawEntryMut::Occupied(mut entry) => {
- entry.get_mut().push((row + offset) as u64);
- }
- hashbrown::hash_map::RawEntryMut::Vacant(entry) => {
- entry.insert_hashed_nocheck(
- *hash_value,
- (),
- smallvec![(row + offset) as u64],
- );
- }
- };
+ let item = hash_map
+ .0
+ .get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
+ if let Some((_, indices)) = item {
+ indices.push((row + offset) as u64);
+ } else {
+ hash_map.0.insert(
+ *hash_value,
+ (*hash_value, smallvec![(row + offset) as u64]),
+ |(hash, _)| *hash,
+ );
+ }
}
Ok(())
}
@@ -678,7 +685,7 @@ fn build_join_indexes(
// This possibly contains rows with hash collisions,
// So we have to check here whether rows are equal or not
if let Some((_, indices)) =
- left.raw_entry().from_hash(*hash_value, |_| true)
+ left.0.get(*hash_value, |(hash, _)| *hash_value == *hash)
{
for &i in indices {
// Check hash collisions
@@ -710,7 +717,7 @@ fn build_join_indexes(
// First visit all of the rows
for (row, hash_value) in hash_values.iter().enumerate() {
if let Some((_, indices)) =
- left.raw_entry().from_hash(*hash_value, |_| true)
+ left.0.get(*hash_value, |(hash, _)| *hash_value == *hash)
{
for &i in indices {
// Collision check
@@ -728,7 +735,7 @@ fn build_join_indexes(
let mut right_indices = UInt32Builder::new(0);
for (row, hash_value) in hash_values.iter().enumerate() {
- match left.raw_entry().from_hash(*hash_value, |_| true) {
+ match left.0.get(*hash_value, |(hash, _)| *hash_value == *hash) {
Some((_, indices)) => {
for &i in indices {
if equal_rows(
@@ -755,38 +762,6 @@ fn build_join_indexes(
}
}
}
-use core::hash::BuildHasher;
-
-/// `Hasher` that returns the same `u64` value as a hash, to avoid re-hashing
-/// it when inserting/indexing or regrowing the `HashMap`
-struct IdHasher {
- hash: u64,
-}
-
-impl Hasher for IdHasher {
- fn finish(&self) -> u64 {
- self.hash
- }
-
- fn write_u64(&mut self, i: u64) {
- self.hash = i;
- }
-
- fn write(&mut self, _bytes: &[u8]) {
- unreachable!("IdHasher should only be used for u64 keys")
- }
-}
-
-#[derive(Debug)]
-struct IdHashBuilder {}
-
-impl BuildHasher for IdHashBuilder {
- type Hasher = IdHasher;
-
- fn build_hasher(&self) -> Self::Hasher {
- IdHasher { hash: 0 }
- }
-}
macro_rules! equal_rows_elem {
($array_type:ident, $l: ident, $r: ident, $left: ident, $right: ident) => {{
@@ -1776,7 +1751,7 @@ mod tests {
#[test]
fn join_with_hash_collision() -> Result<()> {
- let mut hashmap_left = HashMap::with_capacity_and_hasher(2, IdHashBuilder {});
+ let mut hashmap_left = RawTable::with_capacity(2);
let left = build_table_i32(
("a", &vec![10, 20]),
("x", &vec![100, 200]),
@@ -1788,19 +1763,9 @@ mod tests {
let hashes =
create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?;
- // Create hash collisions
- match hashmap_left.raw_entry_mut().from_hash(hashes[0], |_| true) {
- hashbrown::hash_map::RawEntryMut::Vacant(entry) => {
- entry.insert_hashed_nocheck(hashes[0], (), smallvec![0, 1])
- }
- _ => unreachable!("Hash should not be vacant"),
- };
- match hashmap_left.raw_entry_mut().from_hash(hashes[1], |_| true) {
- hashbrown::hash_map::RawEntryMut::Vacant(entry) => {
- entry.insert_hashed_nocheck(hashes[1], (), smallvec![0, 1])
- }
- _ => unreachable!("Hash should not be vacant"),
- };
+ // Create hash collisions (same hashes)
+ hashmap_left.insert(hashes[0], (hashes[0], smallvec![0, 1]), |(h, _)| *h);
+ hashmap_left.insert(hashes[1], (hashes[1], smallvec![0, 1]), |(h, _)| *h);
let right = build_table_i32(
("a", &vec![10, 20]),
@@ -1808,7 +1773,7 @@ mod tests {
("c", &vec![30, 40]),
);
- let left_data = JoinLeftData::new((hashmap_left, left));
+ let left_data = JoinLeftData::new((JoinHashMap(hashmap_left), left));
let (l, r) = build_join_indexes(
&left_data,
&right,