You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/06/15 14:45:17 UTC
[arrow-datafusion] 01/01: Change HashJoin datastructure
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch adapt_datastructure
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
commit 763c24d6c8658809ebba724e9835e3efda9862b5
Author: Daniƫl Heres <da...@coralogix.com>
AuthorDate: Thu Jun 15 16:45:05 2023 +0200
Change HashJoin datastructure
---
.../core/src/physical_plan/joins/hash_join.rs | 41 ++++++++++++++--------
.../src/physical_plan/joins/hash_join_utils.rs | 18 ++++++----
.../src/physical_plan/joins/symmetric_hash_join.rs | 28 +++++++++++++--
3 files changed, 62 insertions(+), 25 deletions(-)
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs
index 0e62540d6d..f097e913d8 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -43,8 +43,6 @@ use arrow::{
util::bit_util,
};
use futures::{ready, Stream, StreamExt, TryStreamExt};
-use hashbrown::raw::RawTable;
-use smallvec::smallvec;
use std::fmt;
use std::sync::Arc;
use std::task::Poll;
@@ -518,9 +516,9 @@ async fn collect_left_input(
reservation.try_grow(estimated_hastable_size)?;
metrics.build_mem_used.add(estimated_hastable_size);
- let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows));
+ let mut hashmap = JoinHashMap::with_capacity(num_rows);
let mut hashes_buffer = Vec::new();
- let mut offset = 0;
+ let mut offset = 1;
for batch in batches.iter() {
hashes_buffer.clear();
hashes_buffer.resize(batch.num_rows(), 0);
@@ -565,14 +563,20 @@ pub fn update_hash(
let item = hash_map
.0
.get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
- if let Some((_, indices)) = item {
- indices.push((row + offset) as u64);
+ if let Some((_, index)) = item {
+ // Already exists: add index to next array
+ let prev_index = *index;
+ *index = (row + offset) as u64;
+ // update chained Vec
+ hash_map.1[*index as usize] = prev_index;
+
} else {
hash_map.0.insert(
*hash_value,
- (*hash_value, smallvec![(row + offset) as u64]),
+ (*hash_value, (row + offset) as u64),
|(hash, _)| *hash,
);
+ // chained list is initalized with 0
}
}
Ok(())
@@ -727,13 +731,13 @@ pub fn build_equal_condition_join_indices(
// For every item on the build and probe we check if it matches
// This possibly contains rows with hash collisions,
// So we have to check here whether rows are equal or not
- if let Some((_, indices)) = build_hashmap
+ if let Some((_, index)) = build_hashmap
.0
.get(*hash_value, |(hash, _)| *hash_value == *hash)
{
- for &i in indices {
- // Check hash collisions
- let offset_build_index = i as usize - offset_value;
+ let mut i = *index;
+ loop {
+ let offset_build_index = i as usize - offset_value - 1;
// Check hash collisions
if equal_rows(
offset_build_index,
@@ -745,6 +749,11 @@ pub fn build_equal_condition_join_indices(
build_indices.append(offset_build_index as u64);
probe_indices.append(row as u32);
}
+ if build_hashmap.1[i as usize] != 0 {
+ i = build_hashmap.1[i as usize];
+ } else {
+ break;
+ }
}
}
}
@@ -1258,11 +1267,11 @@ mod tests {
use arrow::array::{ArrayRef, Date32Array, Int32Array, UInt32Builder, UInt64Builder};
use arrow::datatypes::{DataType, Field, Schema};
- use smallvec::smallvec;
use datafusion_common::ScalarValue;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::Literal;
+ use hashbrown::raw::RawTable;
use crate::execution::context::SessionConfig;
use crate::physical_expr::expressions::BinaryExpr;
@@ -2616,8 +2625,10 @@ mod tests {
create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?;
// Create hash collisions (same hashes)
- hashmap_left.insert(hashes[0], (hashes[0], smallvec![0, 1]), |(h, _)| *h);
- hashmap_left.insert(hashes[1], (hashes[1], smallvec![0, 1]), |(h, _)| *h);
+ hashmap_left.insert(hashes[0], (hashes[0], 1), |(h, _)| *h);
+ hashmap_left.insert(hashes[1], (hashes[1], 1), |(h, _)| *h);
+
+ let next = vec![0, 2, 0];
let right = build_table_i32(
("a", &vec![10, 20]),
@@ -2625,7 +2636,7 @@ mod tests {
("c", &vec![30, 40]),
);
- let left_data = (JoinHashMap(hashmap_left), left);
+ let left_data = (JoinHashMap(hashmap_left, next), left);
let (l, r) = build_equal_condition_join_indices(
&left_data.0,
&left_data.1,
diff --git a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
index 992de86dfe..59db05164d 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
@@ -30,13 +30,13 @@ use datafusion_physical_expr::intervals::Interval;
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use hashbrown::raw::RawTable;
-use smallvec::SmallVec;
use crate::physical_plan::joins::utils::{JoinFilter, JoinSide};
use datafusion_common::Result;
// Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value.
-//
+// The indices (values) are stored in a separate chained list based on (index, next).
+// The first item in the list is reserved.
// Note that the `u64` keys are not stored in the hashmap (hence the `()` as key), but are only used
// to put the indices in a certain bucket.
// By allocating a `HashMap` with capacity for *at least* the number of rows for entries at the build side,
@@ -47,9 +47,13 @@ use datafusion_common::Result;
// but the values don't match. Those are checked in the [equal_rows] macro
// TODO: speed up collision check and move away from using a hashbrown HashMap
// https://github.com/apache/arrow-datafusion/issues/50
-pub struct JoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>);
+pub struct JoinHashMap(pub RawTable<(u64, u64)>, pub Vec<u64>);
impl JoinHashMap {
+ pub(crate) fn with_capacity(capacity: usize) -> JoinHashMap {
+ JoinHashMap(RawTable::with_capacity(capacity), vec![0; capacity + 1])
+ }
+
/// In this implementation, the scale_factor variable determines how conservative the shrinking strategy is.
/// The value of scale_factor is set to 4, which means the capacity will be reduced by 25%
/// when necessary. You can adjust the scale_factor value to achieve the desired
@@ -67,10 +71,11 @@ impl JoinHashMap {
let new_capacity = (capacity * (scale_factor - 1)) / scale_factor;
self.0.shrink_to(new_capacity, |(hash, _)| *hash)
}
+ // todo handle chained list
}
pub(crate) fn size(&self) -> usize {
- self.0.allocation_info().1.size()
+ self.0.allocation_info().1.size() + self.1.capacity() * 16 + 16
}
}
@@ -290,7 +295,6 @@ pub mod tests {
use datafusion_common::ScalarValue;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{binary, cast, col, lit};
- use smallvec::smallvec;
use std::sync::Arc;
/// Filter expr for a + b > c + 10 AND a + b < c + 100
@@ -628,14 +632,14 @@ pub mod tests {
#[test]
fn test_shrink_if_necessary() {
let scale_factor = 4;
- let mut join_hash_map = JoinHashMap(RawTable::with_capacity(100));
+ let mut join_hash_map = JoinHashMap::with_capacity(100);
let data_size = 2000;
let deleted_part = 3 * data_size / 4;
// Add elements to the JoinHashMap
for hash_value in 0..data_size {
join_hash_map.0.insert(
hash_value,
- (hash_value, smallvec![hash_value]),
+ (hash_value, hash_value),
|(hash, _)| *hash,
);
}
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index 7eac619687..80ea9e65dc 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -702,8 +702,30 @@ fn prune_hash_values(
.0
.get_mut(*hash_value, |(hash, _)| hash_value == hash)
{
- separation_chain.retain(|n| !index_set.contains(n));
- if separation_chain.is_empty() {
+ let mut size = 0;
+ let mut i = separation_chain;
+ let mut prev_i = i;
+
+ let mut keep = false;
+
+ // TODO
+ // loop {
+ // if !index_set.contains(i) {
+ // if !keep {
+ // *prev_i = i;
+ // }
+ // // retain this value
+ // keep = true;
+ // size += 1;
+ // }
+ // // drop value
+ // *prev_i = i;
+
+ // if *i == 0 {
+ // break;
+ // }
+ // }
+ if size == 0 {
hashmap
.0
.remove_entry(*hash_value, |(hash, _)| hash_value == hash);
@@ -1076,7 +1098,7 @@ impl OneSideHashJoiner {
build_side,
input_buffer: RecordBatch::new_empty(schema),
on,
- hashmap: JoinHashMap(RawTable::with_capacity(0)),
+ hashmap: JoinHashMap::with_capacity(0),
row_hash_values: VecDeque::new(),
hashes_buffer: vec![],
visited_rows: HashSet::new(),