You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/06/19 12:34:30 UTC
[arrow-datafusion] branch main updated: Improve performance/memory usage of HashJoin datastructure (5-15% improvement on selected TPC-H queries) (#6679)
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 26c90c211e Improve performance/memory usage of HashJoin datastructure (5-15% improvement on selected TPC-H queries) (#6679)
26c90c211e is described below
commit 26c90c211e06407c6a100170f46d2591b41b11a2
Author: Daniël Heres <da...@gmail.com>
AuthorDate: Mon Jun 19 14:34:23 2023 +0200
Improve performance/memory usage of HashJoin datastructure (5-15% improvement on selected TPC-H queries) (#6679)
* Change HashJoin datastructure
* Simplify a bit
* Simplify a bit
* Cleanup, fix symmetric hash join
* Cleanup
* Cleanup
* Add docs
* Add docs
* Use named struct
* Use named struct
* Comment
* Add example
* Update / simplify memory calculation with new datastructure
* Fmt
* Remove offset
---------
Co-authored-by: Daniël Heres <da...@coralogix.com>
---
.../core/src/physical_plan/joins/hash_join.rs | 88 +++++-----
.../src/physical_plan/joins/hash_join_utils.rs | 78 ++++++++-
.../src/physical_plan/joins/symmetric_hash_join.rs | 194 ++++++++++++++++++++-
3 files changed, 305 insertions(+), 55 deletions(-)
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs
index 0e62540d6d..9d016a60f4 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -30,7 +30,7 @@ use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow::{
array::{
- ArrayData, ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array,
+ ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array,
DictionaryArray, FixedSizeBinaryArray, LargeStringArray, PrimitiveArray,
Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
@@ -43,9 +43,8 @@ use arrow::{
util::bit_util,
};
use futures::{ready, Stream, StreamExt, TryStreamExt};
-use hashbrown::raw::RawTable;
-use smallvec::smallvec;
use std::fmt;
+use std::mem::size_of;
use std::sync::Arc;
use std::task::Poll;
use std::{any::Any, usize, vec};
@@ -510,15 +509,16 @@ async fn collect_left_input(
)
})? / 7)
.next_power_of_two();
- // 32 bytes per `(u64, SmallVec<[u64; 1]>)`
+ // 16 bytes per `(u64, u64)`
// + 1 byte for each bucket
- // + 16 bytes fixed
- let estimated_hastable_size = 32 * estimated_buckets + estimated_buckets + 16;
+ // + fixed size of JoinHashMap (RawTable + Vec)
+ let estimated_hastable_size =
+ 16 * estimated_buckets + estimated_buckets + size_of::<JoinHashMap>();
reservation.try_grow(estimated_hastable_size)?;
metrics.build_mem_used.add(estimated_hastable_size);
- let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows));
+ let mut hashmap = JoinHashMap::with_capacity(num_rows);
let mut hashes_buffer = Vec::new();
let mut offset = 0;
for batch in batches.iter() {
@@ -563,16 +563,24 @@ pub fn update_hash(
// insert hashes to key of the hashmap
for (row, hash_value) in hash_values.iter().enumerate() {
let item = hash_map
- .0
+ .map
.get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
- if let Some((_, indices)) = item {
- indices.push((row + offset) as u64);
+ if let Some((_, index)) = item {
+ // Already exists: add index to next array
+ let prev_index = *index;
+ // Store new value inside hashmap
+ *index = (row + offset + 1) as u64;
+ // Update chained Vec at row + offset with previous value
+ hash_map.next[row + offset] = prev_index;
} else {
- hash_map.0.insert(
+ hash_map.map.insert(
*hash_value,
- (*hash_value, smallvec![(row + offset) as u64]),
+ // store the value + 1 as 0 value reserved for end of list
+ (*hash_value, (row + offset + 1) as u64),
|(hash, _)| *hash,
);
+ // chained list at (row + offset) is already initialized with 0
+ // meaning end of list
}
}
Ok(())
@@ -629,7 +637,6 @@ pub fn build_join_indices(
random_state: &RandomState,
null_equals_null: bool,
hashes_buffer: &mut Vec<u64>,
- offset: Option<usize>,
build_side: JoinSide,
) -> Result<(UInt64Array, UInt32Array)> {
// Get the indices that satisfy the equality condition, like `left.a1 = right.a2`
@@ -642,7 +649,6 @@ pub fn build_join_indices(
random_state,
null_equals_null,
hashes_buffer,
- offset,
)?;
if let Some(filter) = filter {
// Filter the indices which satisfy the non-equal join condition, like `left.b1 = 10`
@@ -700,7 +706,6 @@ pub fn build_equal_condition_join_indices(
random_state: &RandomState,
null_equals_null: bool,
hashes_buffer: &mut Vec<u64>,
- offset: Option<usize>,
) -> Result<(UInt64Array, UInt32Array)> {
let keys_values = probe_on
.iter()
@@ -719,7 +724,6 @@ pub fn build_equal_condition_join_indices(
// Using a buffer builder to avoid slower normal builder
let mut build_indices = UInt64BufferBuilder::new(0);
let mut probe_indices = UInt32BufferBuilder::new(0);
- let offset_value = offset.unwrap_or(0);
// Visit all of the probe rows
for (row, hash_value) in hash_values.iter().enumerate() {
// Get the hash and find it in the build index
@@ -727,39 +731,37 @@ pub fn build_equal_condition_join_indices(
// For every item on the build and probe we check if it matches
// This possibly contains rows with hash collisions,
// So we have to check here whether rows are equal or not
- if let Some((_, indices)) = build_hashmap
- .0
+ if let Some((_, index)) = build_hashmap
+ .map
.get(*hash_value, |(hash, _)| *hash_value == *hash)
{
- for &i in indices {
- // Check hash collisions
- let offset_build_index = i as usize - offset_value;
+ let mut i = *index - 1;
+ loop {
// Check hash collisions
if equal_rows(
- offset_build_index,
+ i as usize,
row,
&build_join_values,
&keys_values,
null_equals_null,
)? {
- build_indices.append(offset_build_index as u64);
+ build_indices.append(i);
probe_indices.append(row as u32);
}
+ // Follow the chain to get the next index value
+ let next = build_hashmap.next[i as usize];
+ if next == 0 {
+ // end of list
+ break;
+ }
+ i = next - 1;
}
}
}
- let build = ArrayData::builder(DataType::UInt64)
- .len(build_indices.len())
- .add_buffer(build_indices.finish())
- .build()?;
- let probe = ArrayData::builder(DataType::UInt32)
- .len(probe_indices.len())
- .add_buffer(probe_indices.finish())
- .build()?;
Ok((
- PrimitiveArray::<UInt64Type>::from(build),
- PrimitiveArray::<UInt32Type>::from(probe),
+ PrimitiveArray::new(build_indices.finish().into(), None),
+ PrimitiveArray::new(probe_indices.finish().into(), None),
))
}
@@ -830,7 +832,7 @@ macro_rules! equal_rows_elem_with_string_dict {
/// Left and right row have equal values
/// If more data types are supported here, please also add the data types in can_hash function
/// to generate hash join logical plan.
-fn equal_rows(
+pub fn equal_rows(
left: usize,
right: usize,
left_arrays: &[ArrayRef],
@@ -1157,7 +1159,6 @@ impl HashJoinStream {
&self.random_state,
self.null_equals_null,
&mut hashes_buffer,
- None,
JoinSide::Left,
);
@@ -1258,11 +1259,11 @@ mod tests {
use arrow::array::{ArrayRef, Date32Array, Int32Array, UInt32Builder, UInt64Builder};
use arrow::datatypes::{DataType, Field, Schema};
- use smallvec::smallvec;
use datafusion_common::ScalarValue;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::Literal;
+ use hashbrown::raw::RawTable;
use crate::execution::context::SessionConfig;
use crate::physical_expr::expressions::BinaryExpr;
@@ -2616,8 +2617,10 @@ mod tests {
create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?;
// Create hash collisions (same hashes)
- hashmap_left.insert(hashes[0], (hashes[0], smallvec![0, 1]), |(h, _)| *h);
- hashmap_left.insert(hashes[1], (hashes[1], smallvec![0, 1]), |(h, _)| *h);
+ hashmap_left.insert(hashes[0], (hashes[0], 1), |(h, _)| *h);
+ hashmap_left.insert(hashes[1], (hashes[1], 1), |(h, _)| *h);
+
+ let next = vec![2, 0];
let right = build_table_i32(
("a", &vec![10, 20]),
@@ -2625,7 +2628,13 @@ mod tests {
("c", &vec![30, 40]),
);
- let left_data = (JoinHashMap(hashmap_left), left);
+ let left_data = (
+ JoinHashMap {
+ map: hashmap_left,
+ next,
+ },
+ left,
+ );
let (l, r) = build_equal_condition_join_indices(
&left_data.0,
&left_data.1,
@@ -2635,7 +2644,6 @@ mod tests {
&random_state,
false,
&mut vec![0; right.num_rows()],
- None,
)?;
let mut left_ids = UInt64Builder::with_capacity(0);
diff --git a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
index 992de86dfe..1b9cbd543d 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
@@ -36,24 +36,88 @@ use crate::physical_plan::joins::utils::{JoinFilter, JoinSide};
use datafusion_common::Result;
// Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value.
-//
-// Note that the `u64` keys are not stored in the hashmap (hence the `()` as key), but are only used
-// to put the indices in a certain bucket.
// By allocating a `HashMap` with capacity for *at least* the number of rows for entries at the build side,
// we make sure that we don't have to re-hash the hashmap, which needs access to the key (the hash in this case) value.
// E.g. 1 -> [3, 6, 8] indicates that the column values map to rows 3, 6 and 8 for hash value 1
// As the key is a hash value, we need to check possible hash collisions in the probe stage
// During this stage it might be the case that a row is contained the same hashmap value,
// but the values don't match. Those are checked in the [equal_rows] macro
-// TODO: speed up collision check and move away from using a hashbrown HashMap
+// The indices (values) are stored in a separate chained list stored in the `Vec<u64>`.
+// The first value (+1) is stored in the hashmap, whereas the next value is stored in array at the position value.
+// The chain can be followed until the value "0" has been reached, meaning the end of the list.
+// Also see chapter 5.3 of [Balancing vectorized query execution with bandwidth-optimized storage](https://dare.uva.nl/search?identifier=5ccbb60a-38b8-4eeb-858a-e7735dd37487)
+// See the example below:
+// Insert (1,1)
+// map:
+// ---------
+// | 1 | 2 |
+// ---------
+// next:
+// ---------------------
+// | 0 | 0 | 0 | 0 | 0 |
+// ---------------------
+// Insert (2,2)
+// map:
+// ---------
+// | 1 | 2 |
+// | 2 | 3 |
+// ---------
+// next:
+// ---------------------
+// | 0 | 0 | 0 | 0 | 0 |
+// ---------------------
+// Insert (1,3)
+// map:
+// ---------
+// | 1 | 4 |
+// | 2 | 3 |
+// ---------
+// next:
+// ---------------------
+// | 0 | 0 | 0 | 2 | 0 | <--- hash value 1 maps to 4,2 (which means indices values 3,1)
+// ---------------------
+// Insert (1,4)
+// map:
+// ---------
+// | 1 | 5 |
+// | 2 | 3 |
+// ---------
+// next:
+// ---------------------
+// | 0 | 0 | 0 | 2 | 4 | <--- hash value 1 maps to 5,4,2 (which means indices values 4,3,1)
+// ---------------------
+
+// TODO: speed up collision checks
// https://github.com/apache/arrow-datafusion/issues/50
-pub struct JoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>);
+pub struct JoinHashMap {
+ // Stores hash value to first index
+ pub map: RawTable<(u64, u64)>,
+ // Stores indices in chained list data structure
+ pub next: Vec<u64>,
+}
+
+/// SymmetricJoinHashMap is similar to JoinHashMap, except that it stores the indices inline, allowing it to mutate
+/// and shrink the indices.
+pub struct SymmetricJoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>);
impl JoinHashMap {
+ pub(crate) fn with_capacity(capacity: usize) -> Self {
+ JoinHashMap {
+ map: RawTable::with_capacity(capacity),
+ next: vec![0; capacity],
+ }
+ }
+}
+
+impl SymmetricJoinHashMap {
+ pub(crate) fn with_capacity(capacity: usize) -> Self {
+ Self(RawTable::with_capacity(capacity))
+ }
+
/// In this implementation, the scale_factor variable determines how conservative the shrinking strategy is.
/// The value of scale_factor is set to 4, which means the capacity will be reduced by 25%
/// when necessary. You can adjust the scale_factor value to achieve the desired
- /// ,balance between memory usage and performance.
+ /// balance between memory usage and performance.
//
// If you increase the scale_factor, the capacity will shrink less aggressively,
// leading to potentially higher memory usage but fewer resizes.
@@ -628,7 +692,7 @@ pub mod tests {
#[test]
fn test_shrink_if_necessary() {
let scale_factor = 4;
- let mut join_hash_map = JoinHashMap(RawTable::with_capacity(100));
+ let mut join_hash_map = SymmetricJoinHashMap::with_capacity(100);
let data_size = 2000;
let deleted_part = 3 * data_size / 4;
// Add elements to the JoinHashMap
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index 7eac619687..f2b750c0b6 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -41,10 +41,15 @@ use arrow::array::{
use arrow::compute::concat_batches;
use arrow::datatypes::{ArrowNativeType, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
+use arrow_array::builder::{UInt32BufferBuilder, UInt64BufferBuilder};
+use arrow_array::{UInt32Array, UInt64Array};
+use datafusion_physical_expr::hash_utils::create_hashes;
+use datafusion_physical_expr::PhysicalExpr;
use futures::stream::{select, BoxStream};
use futures::{Stream, StreamExt};
-use hashbrown::{raw::RawTable, HashSet};
+use hashbrown::HashSet;
use parking_lot::Mutex;
+use smallvec::smallvec;
use datafusion_common::{utils::bisect, ScalarValue};
use datafusion_execution::memory_pool::MemoryConsumer;
@@ -52,12 +57,10 @@ use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval, IntervalB
use crate::physical_plan::common::SharedMemoryReservation;
use crate::physical_plan::joins::hash_join_utils::convert_sort_expr_with_filter_schema;
-use crate::physical_plan::joins::hash_join_utils::JoinHashMap;
use crate::physical_plan::{
expressions::Column,
expressions::PhysicalSortExpr,
joins::{
- hash_join::{build_join_indices, update_hash},
hash_join_utils::{build_filter_input_order, SortedFilterExpr},
utils::{
build_batch_from_indices, build_join_schema, check_join_is_valid,
@@ -73,6 +76,10 @@ use datafusion_common::JoinType;
use datafusion_common::{DataFusionError, Result};
use datafusion_execution::TaskContext;
+use super::hash_join::equal_rows;
+use super::hash_join_utils::SymmetricJoinHashMap;
+use super::utils::apply_join_filter_to_indices;
+
const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4;
/// A symmetric hash join with range conditions is when both streams are hashed on the
@@ -681,7 +688,7 @@ impl Stream for SymmetricHashJoinStream {
fn prune_hash_values(
prune_length: usize,
- hashmap: &mut JoinHashMap,
+ hashmap: &mut SymmetricJoinHashMap,
row_hash_values: &mut VecDeque<u64>,
offset: u64,
) -> Result<()> {
@@ -1043,7 +1050,7 @@ struct OneSideHashJoiner {
/// Columns from the side
on: Vec<Column>,
/// Hashmap
- hashmap: JoinHashMap,
+ hashmap: SymmetricJoinHashMap,
/// To optimize hash deleting in case of pruning, we hold them in memory
row_hash_values: VecDeque<u64>,
/// Reuse the hashes buffer
@@ -1076,7 +1083,7 @@ impl OneSideHashJoiner {
build_side,
input_buffer: RecordBatch::new_empty(schema),
on,
- hashmap: JoinHashMap(RawTable::with_capacity(0)),
+ hashmap: SymmetricJoinHashMap::with_capacity(0),
row_hash_values: VecDeque::new(),
hashes_buffer: vec![],
visited_rows: HashSet::new(),
@@ -1085,6 +1092,39 @@ impl OneSideHashJoiner {
}
}
+ pub fn update_hash(
+ on: &[Column],
+ batch: &RecordBatch,
+ hash_map: &mut SymmetricJoinHashMap,
+ offset: usize,
+ random_state: &RandomState,
+ hashes_buffer: &mut Vec<u64>,
+ ) -> Result<()> {
+ // evaluate the keys
+ let keys_values = on
+ .iter()
+ .map(|c| Ok(c.evaluate(batch)?.into_array(batch.num_rows())))
+ .collect::<Result<Vec<_>>>()?;
+ // calculate the hash values
+ let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?;
+ // insert hashes to key of the hashmap
+ for (row, hash_value) in hash_values.iter().enumerate() {
+ let item = hash_map
+ .0
+ .get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
+ if let Some((_, indices)) = item {
+ indices.push((row + offset) as u64);
+ } else {
+ hash_map.0.insert(
+ *hash_value,
+ (*hash_value, smallvec![(row + offset) as u64]),
+ |(hash, _)| *hash,
+ );
+ }
+ }
+ Ok(())
+ }
+
/// Updates the internal state of the [OneSideHashJoiner] with the incoming batch.
///
/// # Arguments
@@ -1106,7 +1146,7 @@ impl OneSideHashJoiner {
self.hashes_buffer.resize(batch.num_rows(), 0);
// Get allocation_info before adding the item
// Update the hashmap with the join key values and hashes of the incoming batch:
- update_hash(
+ Self::update_hash(
&self.on,
batch,
&mut self.hashmap,
@@ -1119,6 +1159,144 @@ impl OneSideHashJoiner {
Ok(())
}
+ /// Gets build and probe indices which satisfy the on condition (including
+ /// the equality condition and the join filter) in the join.
+ #[allow(clippy::too_many_arguments)]
+ pub fn build_join_indices(
+ probe_batch: &RecordBatch,
+ build_hashmap: &SymmetricJoinHashMap,
+ build_input_buffer: &RecordBatch,
+ on_build: &[Column],
+ on_probe: &[Column],
+ filter: Option<&JoinFilter>,
+ random_state: &RandomState,
+ null_equals_null: bool,
+ hashes_buffer: &mut Vec<u64>,
+ offset: Option<usize>,
+ build_side: JoinSide,
+ ) -> Result<(UInt64Array, UInt32Array)> {
+ // Get the indices that satisfy the equality condition, like `left.a1 = right.a2`
+ let (build_indices, probe_indices) = Self::build_equal_condition_join_indices(
+ build_hashmap,
+ build_input_buffer,
+ probe_batch,
+ on_build,
+ on_probe,
+ random_state,
+ null_equals_null,
+ hashes_buffer,
+ offset,
+ )?;
+ if let Some(filter) = filter {
+ // Filter the indices which satisfy the non-equal join condition, like `left.b1 = 10`
+ apply_join_filter_to_indices(
+ build_input_buffer,
+ probe_batch,
+ build_indices,
+ probe_indices,
+ filter,
+ build_side,
+ )
+ } else {
+ Ok((build_indices, probe_indices))
+ }
+ }
+
+ // Returns build/probe indices satisfying the equality condition.
+ // On LEFT.b1 = RIGHT.b2
+ // LEFT Table:
+ // a1 b1 c1
+ // 1 1 10
+ // 3 3 30
+ // 5 5 50
+ // 7 7 70
+ // 9 8 90
+ // 11 8 110
+ // 13 10 130
+ // RIGHT Table:
+ // a2 b2 c2
+ // 2 2 20
+ // 4 4 40
+ // 6 6 60
+ // 8 8 80
+ // 10 10 100
+ // 12 10 120
+ // The result is
+ // "+----+----+-----+----+----+-----+",
+ // "| a1 | b1 | c1 | a2 | b2 | c2 |",
+ // "+----+----+-----+----+----+-----+",
+ // "| 11 | 8 | 110 | 8 | 8 | 80 |",
+ // "| 13 | 10 | 130 | 10 | 10 | 100 |",
+ // "| 13 | 10 | 130 | 12 | 10 | 120 |",
+ // "| 9 | 8 | 90 | 8 | 8 | 80 |",
+ // "+----+----+-----+----+----+-----+"
+ // And the result of build and probe indices are:
+ // Build indices: 5, 6, 6, 4
+ // Probe indices: 3, 4, 5, 3
+ #[allow(clippy::too_many_arguments)]
+ pub fn build_equal_condition_join_indices(
+ build_hashmap: &SymmetricJoinHashMap,
+ build_input_buffer: &RecordBatch,
+ probe_batch: &RecordBatch,
+ build_on: &[Column],
+ probe_on: &[Column],
+ random_state: &RandomState,
+ null_equals_null: bool,
+ hashes_buffer: &mut Vec<u64>,
+ offset: Option<usize>,
+ ) -> Result<(UInt64Array, UInt32Array)> {
+ let keys_values = probe_on
+ .iter()
+ .map(|c| Ok(c.evaluate(probe_batch)?.into_array(probe_batch.num_rows())))
+ .collect::<Result<Vec<_>>>()?;
+ let build_join_values = build_on
+ .iter()
+ .map(|c| {
+ Ok(c.evaluate(build_input_buffer)?
+ .into_array(build_input_buffer.num_rows()))
+ })
+ .collect::<Result<Vec<_>>>()?;
+ hashes_buffer.clear();
+ hashes_buffer.resize(probe_batch.num_rows(), 0);
+ let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?;
+ // Using a buffer builder to avoid slower normal builder
+ let mut build_indices = UInt64BufferBuilder::new(0);
+ let mut probe_indices = UInt32BufferBuilder::new(0);
+ let offset_value = offset.unwrap_or(0);
+ // Visit all of the probe rows
+ for (row, hash_value) in hash_values.iter().enumerate() {
+ // Get the hash and find it in the build index
+ // For every item on the build and probe we check if it matches
+ // This possibly contains rows with hash collisions,
+ // So we have to check here whether rows are equal or not
+ if let Some((_, indices)) = build_hashmap
+ .0
+ .get(*hash_value, |(hash, _)| *hash_value == *hash)
+ {
+ for &i in indices {
+ // Check hash collisions
+ let offset_build_index = i as usize - offset_value;
+ // Check hash collisions
+ if equal_rows(
+ offset_build_index,
+ row,
+ &build_join_values,
+ &keys_values,
+ null_equals_null,
+ )? {
+ build_indices.append(offset_build_index as u64);
+ probe_indices.append(row as u32);
+ }
+ }
+ }
+ }
+
+ Ok((
+ PrimitiveArray::new(build_indices.finish().into(), None),
+ PrimitiveArray::new(probe_indices.finish().into(), None),
+ ))
+ }
+
/// This method performs a join between the build side input buffer and the probe side batch.
///
/// # Arguments
@@ -1155,7 +1333,7 @@ impl OneSideHashJoiner {
if self.input_buffer.num_rows() == 0 || probe_batch.num_rows() == 0 {
return Ok(None);
}
- let (build_indices, probe_indices) = build_join_indices(
+ let (build_indices, probe_indices) = Self::build_join_indices(
probe_batch,
&self.hashmap,
&self.input_buffer,