You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/06/16 13:15:29 UTC
[arrow-datafusion] branch adapt_datastructure updated: Cleanup, fix symmetric hash join
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch adapt_datastructure
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/adapt_datastructure by this push:
new 23f3acdbc6 Cleanup, fix symmetric hash join
23f3acdbc6 is described below
commit 23f3acdbc6f082e04484013927583746f37d10d9
Author: Daniƫl Heres <da...@coralogix.com>
AuthorDate: Fri Jun 16 15:15:23 2023 +0200
Cleanup, fix symmetric hash join
---
.../core/src/physical_plan/joins/hash_join.rs | 17 +-
.../src/physical_plan/joins/hash_join_utils.rs | 36 ++--
.../src/physical_plan/joins/symmetric_hash_join.rs | 220 ++++++++++++++++++---
3 files changed, 219 insertions(+), 54 deletions(-)
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs
index 4ad71665ec..a2ced794cb 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -568,8 +568,7 @@ pub fn update_hash(
let prev_index = *index;
*index = (row + offset) as u64;
// update chained Vec
- hash_map.1[*index as usize] = prev_index;
-
+ hash_map.1[(*index - 1) as usize] = prev_index;
} else {
hash_map.0.insert(
*hash_value,
@@ -735,9 +734,9 @@ pub fn build_equal_condition_join_indices(
.0
.get(*hash_value, |(hash, _)| *hash_value == *hash)
{
- let mut i = *index;
+ let mut i = *index - 1;
loop {
- let offset_build_index = i as usize - offset_value - 1;
+ let offset_build_index = i as usize - offset_value;
// Check hash collisions
if equal_rows(
offset_build_index,
@@ -749,11 +748,11 @@ pub fn build_equal_condition_join_indices(
build_indices.append(offset_build_index as u64);
probe_indices.append(row as u32);
}
- if build_hashmap.1[i as usize] != 0 {
- i = build_hashmap.1[i as usize];
- } else {
+ if build_hashmap.1[i as usize] == 0 {
+ // end of list
break;
}
+ i = build_hashmap.1[i as usize] - 1;
}
}
}
@@ -831,7 +830,7 @@ macro_rules! equal_rows_elem_with_string_dict {
/// Left and right row have equal values
/// If more data types are supported here, please also add the data types in can_hash function
/// to generate hash join logical plan.
-fn equal_rows(
+pub fn equal_rows(
left: usize,
right: usize,
left_arrays: &[ArrayRef],
@@ -2620,7 +2619,7 @@ mod tests {
hashmap_left.insert(hashes[0], (hashes[0], 1), |(h, _)| *h);
hashmap_left.insert(hashes[1], (hashes[1], 1), |(h, _)| *h);
- let next = vec![0, 2, 0];
+ let next = vec![2, 0];
let right = build_table_i32(
("a", &vec![10, 20]),
diff --git a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
index 59db05164d..28af042390 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
@@ -30,34 +30,44 @@ use datafusion_physical_expr::intervals::Interval;
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use hashbrown::raw::RawTable;
+use smallvec::SmallVec;
use crate::physical_plan::joins::utils::{JoinFilter, JoinSide};
use datafusion_common::Result;
// Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value.
-// The indices (values) are stored in a separate chained list based on (index, next).
-// The first item in the list is reserved.
-// Note that the `u64` keys are not stored in the hashmap (hence the `()` as key), but are only used
-// to put the indices in a certain bucket.
// By allocating a `HashMap` with capacity for *at least* the number of rows for entries at the build side,
// we make sure that we don't have to re-hash the hashmap, which needs access to the key (the hash in this case) value.
// E.g. 1 -> [3, 6, 8] indicates that the column values map to rows 3, 6 and 8 for hash value 1
// As the key is a hash value, we need to check possible hash collisions in the probe stage
// During this stage it might be the case that a row is contained the same hashmap value,
// but the values don't match. Those are checked in the [equal_rows] macro
-// TODO: speed up collision check and move away from using a hashbrown HashMap
+// The indices (values) are stored in a separate chained list stored in the `Vec<u64>`.
+// The first index is stored in the hashmap, whereas the next value is stored in the index.
+// A value of 0 means end of list.
+// TODO: speed up collision checks
// https://github.com/apache/arrow-datafusion/issues/50
pub struct JoinHashMap(pub RawTable<(u64, u64)>, pub Vec<u64>);
+/// SymmetricJoinHashMap is similar to JoinHashMap, except that it stores the indices inline, allowing it to mutate
+/// and shrink the indices.
+pub struct SymmetricJoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>);
+
impl JoinHashMap {
- pub(crate) fn with_capacity(capacity: usize) -> JoinHashMap {
- JoinHashMap(RawTable::with_capacity(capacity), vec![0; capacity + 1])
- }
+ pub(crate) fn with_capacity(capacity: usize) -> Self {
+ JoinHashMap(RawTable::with_capacity(capacity), vec![0; capacity])
+ }
+}
+
+impl SymmetricJoinHashMap {
+ pub(crate) fn with_capacity(capacity: usize) -> Self {
+ Self(RawTable::with_capacity(capacity))
+ }
/// In this implementation, the scale_factor variable determines how conservative the shrinking strategy is.
/// The value of scale_factor is set to 4, which means the capacity will be reduced by 25%
/// when necessary. You can adjust the scale_factor value to achieve the desired
- /// ,balance between memory usage and performance.
+ /// balance between memory usage and performance.
//
// If you increase the scale_factor, the capacity will shrink less aggressively,
// leading to potentially higher memory usage but fewer resizes.
@@ -71,11 +81,10 @@ impl JoinHashMap {
let new_capacity = (capacity * (scale_factor - 1)) / scale_factor;
self.0.shrink_to(new_capacity, |(hash, _)| *hash)
}
- // todo handle chained list
}
pub(crate) fn size(&self) -> usize {
- self.0.allocation_info().1.size() + self.1.capacity() * 16 + 16
+ self.0.allocation_info().1.size()
}
}
@@ -295,6 +304,7 @@ pub mod tests {
use datafusion_common::ScalarValue;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{binary, cast, col, lit};
+ use smallvec::smallvec;
use std::sync::Arc;
/// Filter expr for a + b > c + 10 AND a + b < c + 100
@@ -632,14 +642,14 @@ pub mod tests {
#[test]
fn test_shrink_if_necessary() {
let scale_factor = 4;
- let mut join_hash_map = JoinHashMap::with_capacity(100);
+ let mut join_hash_map = SymmetricJoinHashMap::with_capacity(100);
let data_size = 2000;
let deleted_part = 3 * data_size / 4;
// Add elements to the JoinHashMap
for hash_value in 0..data_size {
join_hash_map.0.insert(
hash_value,
- (hash_value, hash_value),
+ (hash_value, smallvec![hash_value]),
|(hash, _)| *hash,
);
}
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index 80ea9e65dc..f2b750c0b6 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -41,10 +41,15 @@ use arrow::array::{
use arrow::compute::concat_batches;
use arrow::datatypes::{ArrowNativeType, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
+use arrow_array::builder::{UInt32BufferBuilder, UInt64BufferBuilder};
+use arrow_array::{UInt32Array, UInt64Array};
+use datafusion_physical_expr::hash_utils::create_hashes;
+use datafusion_physical_expr::PhysicalExpr;
use futures::stream::{select, BoxStream};
use futures::{Stream, StreamExt};
-use hashbrown::{raw::RawTable, HashSet};
+use hashbrown::HashSet;
use parking_lot::Mutex;
+use smallvec::smallvec;
use datafusion_common::{utils::bisect, ScalarValue};
use datafusion_execution::memory_pool::MemoryConsumer;
@@ -52,12 +57,10 @@ use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval, IntervalB
use crate::physical_plan::common::SharedMemoryReservation;
use crate::physical_plan::joins::hash_join_utils::convert_sort_expr_with_filter_schema;
-use crate::physical_plan::joins::hash_join_utils::JoinHashMap;
use crate::physical_plan::{
expressions::Column,
expressions::PhysicalSortExpr,
joins::{
- hash_join::{build_join_indices, update_hash},
hash_join_utils::{build_filter_input_order, SortedFilterExpr},
utils::{
build_batch_from_indices, build_join_schema, check_join_is_valid,
@@ -73,6 +76,10 @@ use datafusion_common::JoinType;
use datafusion_common::{DataFusionError, Result};
use datafusion_execution::TaskContext;
+use super::hash_join::equal_rows;
+use super::hash_join_utils::SymmetricJoinHashMap;
+use super::utils::apply_join_filter_to_indices;
+
const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4;
/// A symmetric hash join with range conditions is when both streams are hashed on the
@@ -681,7 +688,7 @@ impl Stream for SymmetricHashJoinStream {
fn prune_hash_values(
prune_length: usize,
- hashmap: &mut JoinHashMap,
+ hashmap: &mut SymmetricJoinHashMap,
row_hash_values: &mut VecDeque<u64>,
offset: u64,
) -> Result<()> {
@@ -702,30 +709,8 @@ fn prune_hash_values(
.0
.get_mut(*hash_value, |(hash, _)| hash_value == hash)
{
- let mut size = 0;
- let mut i = separation_chain;
- let mut prev_i = i;
-
- let mut keep = false;
-
- // TODO
- // loop {
- // if !index_set.contains(i) {
- // if !keep {
- // *prev_i = i;
- // }
- // // retain this value
- // keep = true;
- // size += 1;
- // }
- // // drop value
- // *prev_i = i;
-
- // if *i == 0 {
- // break;
- // }
- // }
- if size == 0 {
+ separation_chain.retain(|n| !index_set.contains(n));
+ if separation_chain.is_empty() {
hashmap
.0
.remove_entry(*hash_value, |(hash, _)| hash_value == hash);
@@ -1065,7 +1050,7 @@ struct OneSideHashJoiner {
/// Columns from the side
on: Vec<Column>,
/// Hashmap
- hashmap: JoinHashMap,
+ hashmap: SymmetricJoinHashMap,
/// To optimize hash deleting in case of pruning, we hold them in memory
row_hash_values: VecDeque<u64>,
/// Reuse the hashes buffer
@@ -1098,7 +1083,7 @@ impl OneSideHashJoiner {
build_side,
input_buffer: RecordBatch::new_empty(schema),
on,
- hashmap: JoinHashMap::with_capacity(0),
+ hashmap: SymmetricJoinHashMap::with_capacity(0),
row_hash_values: VecDeque::new(),
hashes_buffer: vec![],
visited_rows: HashSet::new(),
@@ -1107,6 +1092,39 @@ impl OneSideHashJoiner {
}
}
+ pub fn update_hash(
+ on: &[Column],
+ batch: &RecordBatch,
+ hash_map: &mut SymmetricJoinHashMap,
+ offset: usize,
+ random_state: &RandomState,
+ hashes_buffer: &mut Vec<u64>,
+ ) -> Result<()> {
+ // evaluate the keys
+ let keys_values = on
+ .iter()
+ .map(|c| Ok(c.evaluate(batch)?.into_array(batch.num_rows())))
+ .collect::<Result<Vec<_>>>()?;
+ // calculate the hash values
+ let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?;
+ // insert hashes to key of the hashmap
+ for (row, hash_value) in hash_values.iter().enumerate() {
+ let item = hash_map
+ .0
+ .get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
+ if let Some((_, indices)) = item {
+ indices.push((row + offset) as u64);
+ } else {
+ hash_map.0.insert(
+ *hash_value,
+ (*hash_value, smallvec![(row + offset) as u64]),
+ |(hash, _)| *hash,
+ );
+ }
+ }
+ Ok(())
+ }
+
/// Updates the internal state of the [OneSideHashJoiner] with the incoming batch.
///
/// # Arguments
@@ -1128,7 +1146,7 @@ impl OneSideHashJoiner {
self.hashes_buffer.resize(batch.num_rows(), 0);
// Get allocation_info before adding the item
// Update the hashmap with the join key values and hashes of the incoming batch:
- update_hash(
+ Self::update_hash(
&self.on,
batch,
&mut self.hashmap,
@@ -1141,6 +1159,144 @@ impl OneSideHashJoiner {
Ok(())
}
+ /// Gets build and probe indices which satisfy the on condition (including
+ /// the equality condition and the join filter) in the join.
+ #[allow(clippy::too_many_arguments)]
+ pub fn build_join_indices(
+ probe_batch: &RecordBatch,
+ build_hashmap: &SymmetricJoinHashMap,
+ build_input_buffer: &RecordBatch,
+ on_build: &[Column],
+ on_probe: &[Column],
+ filter: Option<&JoinFilter>,
+ random_state: &RandomState,
+ null_equals_null: bool,
+ hashes_buffer: &mut Vec<u64>,
+ offset: Option<usize>,
+ build_side: JoinSide,
+ ) -> Result<(UInt64Array, UInt32Array)> {
+ // Get the indices that satisfy the equality condition, like `left.a1 = right.a2`
+ let (build_indices, probe_indices) = Self::build_equal_condition_join_indices(
+ build_hashmap,
+ build_input_buffer,
+ probe_batch,
+ on_build,
+ on_probe,
+ random_state,
+ null_equals_null,
+ hashes_buffer,
+ offset,
+ )?;
+ if let Some(filter) = filter {
+ // Filter the indices which satisfy the non-equal join condition, like `left.b1 = 10`
+ apply_join_filter_to_indices(
+ build_input_buffer,
+ probe_batch,
+ build_indices,
+ probe_indices,
+ filter,
+ build_side,
+ )
+ } else {
+ Ok((build_indices, probe_indices))
+ }
+ }
+
+ // Returns build/probe indices satisfying the equality condition.
+ // On LEFT.b1 = RIGHT.b2
+ // LEFT Table:
+ // a1 b1 c1
+ // 1 1 10
+ // 3 3 30
+ // 5 5 50
+ // 7 7 70
+ // 9 8 90
+ // 11 8 110
+ // 13 10 130
+ // RIGHT Table:
+ // a2 b2 c2
+ // 2 2 20
+ // 4 4 40
+ // 6 6 60
+ // 8 8 80
+ // 10 10 100
+ // 12 10 120
+ // The result is
+ // "+----+----+-----+----+----+-----+",
+ // "| a1 | b1 | c1 | a2 | b2 | c2 |",
+ // "+----+----+-----+----+----+-----+",
+ // "| 11 | 8 | 110 | 8 | 8 | 80 |",
+ // "| 13 | 10 | 130 | 10 | 10 | 100 |",
+ // "| 13 | 10 | 130 | 12 | 10 | 120 |",
+ // "| 9 | 8 | 90 | 8 | 8 | 80 |",
+ // "+----+----+-----+----+----+-----+"
+ // And the result of build and probe indices are:
+ // Build indices: 5, 6, 6, 4
+ // Probe indices: 3, 4, 5, 3
+ #[allow(clippy::too_many_arguments)]
+ pub fn build_equal_condition_join_indices(
+ build_hashmap: &SymmetricJoinHashMap,
+ build_input_buffer: &RecordBatch,
+ probe_batch: &RecordBatch,
+ build_on: &[Column],
+ probe_on: &[Column],
+ random_state: &RandomState,
+ null_equals_null: bool,
+ hashes_buffer: &mut Vec<u64>,
+ offset: Option<usize>,
+ ) -> Result<(UInt64Array, UInt32Array)> {
+ let keys_values = probe_on
+ .iter()
+ .map(|c| Ok(c.evaluate(probe_batch)?.into_array(probe_batch.num_rows())))
+ .collect::<Result<Vec<_>>>()?;
+ let build_join_values = build_on
+ .iter()
+ .map(|c| {
+ Ok(c.evaluate(build_input_buffer)?
+ .into_array(build_input_buffer.num_rows()))
+ })
+ .collect::<Result<Vec<_>>>()?;
+ hashes_buffer.clear();
+ hashes_buffer.resize(probe_batch.num_rows(), 0);
+ let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?;
+ // Using a buffer builder to avoid slower normal builder
+ let mut build_indices = UInt64BufferBuilder::new(0);
+ let mut probe_indices = UInt32BufferBuilder::new(0);
+ let offset_value = offset.unwrap_or(0);
+ // Visit all of the probe rows
+ for (row, hash_value) in hash_values.iter().enumerate() {
+ // Get the hash and find it in the build index
+ // For every item on the build and probe we check if it matches
+ // This possibly contains rows with hash collisions,
+ // So we have to check here whether rows are equal or not
+ if let Some((_, indices)) = build_hashmap
+ .0
+ .get(*hash_value, |(hash, _)| *hash_value == *hash)
+ {
+ for &i in indices {
+ // Check hash collisions
+ let offset_build_index = i as usize - offset_value;
+ // Check hash collisions
+ if equal_rows(
+ offset_build_index,
+ row,
+ &build_join_values,
+ &keys_values,
+ null_equals_null,
+ )? {
+ build_indices.append(offset_build_index as u64);
+ probe_indices.append(row as u32);
+ }
+ }
+ }
+ }
+
+ Ok((
+ PrimitiveArray::new(build_indices.finish().into(), None),
+ PrimitiveArray::new(probe_indices.finish().into(), None),
+ ))
+ }
+
/// This method performs a join between the build side input buffer and the probe side batch.
///
/// # Arguments
@@ -1177,7 +1333,7 @@ impl OneSideHashJoiner {
if self.input_buffer.num_rows() == 0 || probe_batch.num_rows() == 0 {
return Ok(None);
}
- let (build_indices, probe_indices) = build_join_indices(
+ let (build_indices, probe_indices) = Self::build_join_indices(
probe_batch,
&self.hashmap,
&self.input_buffer,