You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/06/19 12:34:30 UTC

[arrow-datafusion] branch main updated: Improve performance/memory usage of HashJoin datastructure (5-15% improvement on selected TPC-H queries) (#6679)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 26c90c211e Improve performance/memory usage of HashJoin datastructure (5-15% improvement on selected TPC-H queries) (#6679)
26c90c211e is described below

commit 26c90c211e06407c6a100170f46d2591b41b11a2
Author: Daniël Heres <da...@gmail.com>
AuthorDate: Mon Jun 19 14:34:23 2023 +0200

    Improve performance/memory usage of HashJoin datastructure (5-15% improvement on selected TPC-H queries) (#6679)
    
    * Change HashJoin datastructure
    
    * Simplify a bit
    
    * Simplify a bit
    
    * Cleanup, fix symmetric hash join
    
    * Cleanup
    
    * Cleanup
    
    * Add docs
    
    * Add docs
    
    * Use named struct
    
    * Use named struct
    
    * Comment
    
    * Add example
    
    * Update / simplify memory calculation with new datastructure
    
    * Fmt
    
    * Remove offset
    
    ---------
    
    Co-authored-by: Daniël Heres <da...@coralogix.com>
---
 .../core/src/physical_plan/joins/hash_join.rs      |  88 +++++-----
 .../src/physical_plan/joins/hash_join_utils.rs     |  78 ++++++++-
 .../src/physical_plan/joins/symmetric_hash_join.rs | 194 ++++++++++++++++++++-
 3 files changed, 305 insertions(+), 55 deletions(-)

diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs
index 0e62540d6d..9d016a60f4 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -30,7 +30,7 @@ use arrow::datatypes::{Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
 use arrow::{
     array::{
-        ArrayData, ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array,
+        ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array,
         DictionaryArray, FixedSizeBinaryArray, LargeStringArray, PrimitiveArray,
         Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
         Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
@@ -43,9 +43,8 @@ use arrow::{
     util::bit_util,
 };
 use futures::{ready, Stream, StreamExt, TryStreamExt};
-use hashbrown::raw::RawTable;
-use smallvec::smallvec;
 use std::fmt;
+use std::mem::size_of;
 use std::sync::Arc;
 use std::task::Poll;
 use std::{any::Any, usize, vec};
@@ -510,15 +509,16 @@ async fn collect_left_input(
         )
     })? / 7)
         .next_power_of_two();
-    // 32 bytes per `(u64, SmallVec<[u64; 1]>)`
+    // 16 bytes per `(u64, u64)`
     // + 1 byte for each bucket
-    // + 16 bytes fixed
-    let estimated_hastable_size = 32 * estimated_buckets + estimated_buckets + 16;
+    // + fixed size of JoinHashMap (RawTable + Vec)
+    let estimated_hastable_size =
+        16 * estimated_buckets + estimated_buckets + size_of::<JoinHashMap>();
 
     reservation.try_grow(estimated_hastable_size)?;
     metrics.build_mem_used.add(estimated_hastable_size);
 
-    let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows));
+    let mut hashmap = JoinHashMap::with_capacity(num_rows);
     let mut hashes_buffer = Vec::new();
     let mut offset = 0;
     for batch in batches.iter() {
@@ -563,16 +563,24 @@ pub fn update_hash(
     // insert hashes to key of the hashmap
     for (row, hash_value) in hash_values.iter().enumerate() {
         let item = hash_map
-            .0
+            .map
             .get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
-        if let Some((_, indices)) = item {
-            indices.push((row + offset) as u64);
+        if let Some((_, index)) = item {
+            // Already exists: add index to next array
+            let prev_index = *index;
+            // Store new value inside hashmap
+            *index = (row + offset + 1) as u64;
+            // Update chained Vec at row + offset with previous value
+            hash_map.next[row + offset] = prev_index;
         } else {
-            hash_map.0.insert(
+            hash_map.map.insert(
                 *hash_value,
-                (*hash_value, smallvec![(row + offset) as u64]),
+                // store the value + 1 as 0 value reserved for end of list
+                (*hash_value, (row + offset + 1) as u64),
                 |(hash, _)| *hash,
             );
+            // chained list at (row + offset) is already initialized with 0
+            // meaning end of list
         }
     }
     Ok(())
@@ -629,7 +637,6 @@ pub fn build_join_indices(
     random_state: &RandomState,
     null_equals_null: bool,
     hashes_buffer: &mut Vec<u64>,
-    offset: Option<usize>,
     build_side: JoinSide,
 ) -> Result<(UInt64Array, UInt32Array)> {
     // Get the indices that satisfy the equality condition, like `left.a1 = right.a2`
@@ -642,7 +649,6 @@ pub fn build_join_indices(
         random_state,
         null_equals_null,
         hashes_buffer,
-        offset,
     )?;
     if let Some(filter) = filter {
         // Filter the indices which satisfy the non-equal join condition, like `left.b1 = 10`
@@ -700,7 +706,6 @@ pub fn build_equal_condition_join_indices(
     random_state: &RandomState,
     null_equals_null: bool,
     hashes_buffer: &mut Vec<u64>,
-    offset: Option<usize>,
 ) -> Result<(UInt64Array, UInt32Array)> {
     let keys_values = probe_on
         .iter()
@@ -719,7 +724,6 @@ pub fn build_equal_condition_join_indices(
     // Using a buffer builder to avoid slower normal builder
     let mut build_indices = UInt64BufferBuilder::new(0);
     let mut probe_indices = UInt32BufferBuilder::new(0);
-    let offset_value = offset.unwrap_or(0);
     // Visit all of the probe rows
     for (row, hash_value) in hash_values.iter().enumerate() {
         // Get the hash and find it in the build index
@@ -727,39 +731,37 @@ pub fn build_equal_condition_join_indices(
         // For every item on the build and probe we check if it matches
         // This possibly contains rows with hash collisions,
         // So we have to check here whether rows are equal or not
-        if let Some((_, indices)) = build_hashmap
-            .0
+        if let Some((_, index)) = build_hashmap
+            .map
             .get(*hash_value, |(hash, _)| *hash_value == *hash)
         {
-            for &i in indices {
-                // Check hash collisions
-                let offset_build_index = i as usize - offset_value;
+            let mut i = *index - 1;
+            loop {
                 // Check hash collisions
                 if equal_rows(
-                    offset_build_index,
+                    i as usize,
                     row,
                     &build_join_values,
                     &keys_values,
                     null_equals_null,
                 )? {
-                    build_indices.append(offset_build_index as u64);
+                    build_indices.append(i);
                     probe_indices.append(row as u32);
                 }
+                // Follow the chain to get the next index value
+                let next = build_hashmap.next[i as usize];
+                if next == 0 {
+                    // end of list
+                    break;
+                }
+                i = next - 1;
             }
         }
     }
-    let build = ArrayData::builder(DataType::UInt64)
-        .len(build_indices.len())
-        .add_buffer(build_indices.finish())
-        .build()?;
-    let probe = ArrayData::builder(DataType::UInt32)
-        .len(probe_indices.len())
-        .add_buffer(probe_indices.finish())
-        .build()?;
 
     Ok((
-        PrimitiveArray::<UInt64Type>::from(build),
-        PrimitiveArray::<UInt32Type>::from(probe),
+        PrimitiveArray::new(build_indices.finish().into(), None),
+        PrimitiveArray::new(probe_indices.finish().into(), None),
     ))
 }
 
@@ -830,7 +832,7 @@ macro_rules! equal_rows_elem_with_string_dict {
 /// Left and right row have equal values
 /// If more data types are supported here, please also add the data types in can_hash function
 /// to generate hash join logical plan.
-fn equal_rows(
+pub fn equal_rows(
     left: usize,
     right: usize,
     left_arrays: &[ArrayRef],
@@ -1157,7 +1159,6 @@ impl HashJoinStream {
                         &self.random_state,
                         self.null_equals_null,
                         &mut hashes_buffer,
-                        None,
                         JoinSide::Left,
                     );
 
@@ -1258,11 +1259,11 @@ mod tests {
 
     use arrow::array::{ArrayRef, Date32Array, Int32Array, UInt32Builder, UInt64Builder};
     use arrow::datatypes::{DataType, Field, Schema};
-    use smallvec::smallvec;
 
     use datafusion_common::ScalarValue;
     use datafusion_expr::Operator;
     use datafusion_physical_expr::expressions::Literal;
+    use hashbrown::raw::RawTable;
 
     use crate::execution::context::SessionConfig;
     use crate::physical_expr::expressions::BinaryExpr;
@@ -2616,8 +2617,10 @@ mod tests {
             create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?;
 
         // Create hash collisions (same hashes)
-        hashmap_left.insert(hashes[0], (hashes[0], smallvec![0, 1]), |(h, _)| *h);
-        hashmap_left.insert(hashes[1], (hashes[1], smallvec![0, 1]), |(h, _)| *h);
+        hashmap_left.insert(hashes[0], (hashes[0], 1), |(h, _)| *h);
+        hashmap_left.insert(hashes[1], (hashes[1], 1), |(h, _)| *h);
+
+        let next = vec![2, 0];
 
         let right = build_table_i32(
             ("a", &vec![10, 20]),
@@ -2625,7 +2628,13 @@ mod tests {
             ("c", &vec![30, 40]),
         );
 
-        let left_data = (JoinHashMap(hashmap_left), left);
+        let left_data = (
+            JoinHashMap {
+                map: hashmap_left,
+                next,
+            },
+            left,
+        );
         let (l, r) = build_equal_condition_join_indices(
             &left_data.0,
             &left_data.1,
@@ -2635,7 +2644,6 @@ mod tests {
             &random_state,
             false,
             &mut vec![0; right.num_rows()],
-            None,
         )?;
 
         let mut left_ids = UInt64Builder::with_capacity(0);
diff --git a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
index 992de86dfe..1b9cbd543d 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
@@ -36,24 +36,88 @@ use crate::physical_plan::joins::utils::{JoinFilter, JoinSide};
 use datafusion_common::Result;
 
 // Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value.
-//
-// Note that the `u64` keys are not stored in the hashmap (hence the `()` as key), but are only used
-// to put the indices in a certain bucket.
 // By allocating a `HashMap` with capacity for *at least* the number of rows for entries at the build side,
 // we make sure that we don't have to re-hash the hashmap, which needs access to the key (the hash in this case) value.
 // E.g. 1 -> [3, 6, 8] indicates that the column values map to rows 3, 6 and 8 for hash value 1
 // As the key is a hash value, we need to check possible hash collisions in the probe stage
 // During this stage it might be the case that a row is contained the same hashmap value,
 // but the values don't match. Those are checked in the [equal_rows] macro
-// TODO: speed up collision check and move away from using a hashbrown HashMap
+// The indices (values) are stored in a separate chained list stored in the `Vec<u64>`.
+// The first value (+1) is stored in the hashmap, whereas the next value is stored in array at the position value.
+// The chain can be followed until the value "0" has been reached, meaning the end of the list.
+// Also see chapter 5.3 of [Balancing vectorized query execution with bandwidth-optimized storage](https://dare.uva.nl/search?identifier=5ccbb60a-38b8-4eeb-858a-e7735dd37487)
+// See the example below:
+// Insert (1,1)
+// map:
+// ---------
+// | 1 | 2 |
+// ---------
+// next:
+// ---------------------
+// | 0 | 0 | 0 | 0 | 0 |
+// ---------------------
+// Insert (2,2)
+// map:
+// ---------
+// | 1 | 2 |
+// | 2 | 3 |
+// ---------
+// next:
+// ---------------------
+// | 0 | 0 | 0 | 0 | 0 |
+// ---------------------
+// Insert (1,3)
+// map:
+// ---------
+// | 1 | 4 |
+// | 2 | 3 |
+// ---------
+// next:
+// ---------------------
+// | 0 | 0 | 0 | 2 | 0 |  <--- hash value 1 maps to 4,2 (which means indices values 3,1)
+// ---------------------
+// Insert (1,4)
+// map:
+// ---------
+// | 1 | 5 |
+// | 2 | 3 |
+// ---------
+// next:
+// ---------------------
+// | 0 | 0 | 0 | 2 | 4 | <--- hash value 1 maps to 5,4,2 (which means indices values 4,3,1)
+// ---------------------
+
+// TODO: speed up collision checks
 // https://github.com/apache/arrow-datafusion/issues/50
-pub struct JoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>);
+pub struct JoinHashMap {
+    // Stores hash value to first index
+    pub map: RawTable<(u64, u64)>,
+    // Stores indices in chained list data structure
+    pub next: Vec<u64>,
+}
+
+/// SymmetricJoinHashMap is similar to JoinHashMap, except that it stores the indices inline, allowing it to mutate
+/// and shrink the indices.
+pub struct SymmetricJoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>);
 
 impl JoinHashMap {
+    pub(crate) fn with_capacity(capacity: usize) -> Self {
+        JoinHashMap {
+            map: RawTable::with_capacity(capacity),
+            next: vec![0; capacity],
+        }
+    }
+}
+
+impl SymmetricJoinHashMap {
+    pub(crate) fn with_capacity(capacity: usize) -> Self {
+        Self(RawTable::with_capacity(capacity))
+    }
+
     /// In this implementation, the scale_factor variable determines how conservative the shrinking strategy is.
     /// The value of scale_factor is set to 4, which means the capacity will be reduced by 25%
     /// when necessary. You can adjust the scale_factor value to achieve the desired
-    /// ,balance between memory usage and performance.
+    /// balance between memory usage and performance.
     //
     // If you increase the scale_factor, the capacity will shrink less aggressively,
     // leading to potentially higher memory usage but fewer resizes.
@@ -628,7 +692,7 @@ pub mod tests {
     #[test]
     fn test_shrink_if_necessary() {
         let scale_factor = 4;
-        let mut join_hash_map = JoinHashMap(RawTable::with_capacity(100));
+        let mut join_hash_map = SymmetricJoinHashMap::with_capacity(100);
         let data_size = 2000;
         let deleted_part = 3 * data_size / 4;
         // Add elements to the JoinHashMap
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index 7eac619687..f2b750c0b6 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -41,10 +41,15 @@ use arrow::array::{
 use arrow::compute::concat_batches;
 use arrow::datatypes::{ArrowNativeType, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
+use arrow_array::builder::{UInt32BufferBuilder, UInt64BufferBuilder};
+use arrow_array::{UInt32Array, UInt64Array};
+use datafusion_physical_expr::hash_utils::create_hashes;
+use datafusion_physical_expr::PhysicalExpr;
 use futures::stream::{select, BoxStream};
 use futures::{Stream, StreamExt};
-use hashbrown::{raw::RawTable, HashSet};
+use hashbrown::HashSet;
 use parking_lot::Mutex;
+use smallvec::smallvec;
 
 use datafusion_common::{utils::bisect, ScalarValue};
 use datafusion_execution::memory_pool::MemoryConsumer;
@@ -52,12 +57,10 @@ use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval, IntervalB
 
 use crate::physical_plan::common::SharedMemoryReservation;
 use crate::physical_plan::joins::hash_join_utils::convert_sort_expr_with_filter_schema;
-use crate::physical_plan::joins::hash_join_utils::JoinHashMap;
 use crate::physical_plan::{
     expressions::Column,
     expressions::PhysicalSortExpr,
     joins::{
-        hash_join::{build_join_indices, update_hash},
         hash_join_utils::{build_filter_input_order, SortedFilterExpr},
         utils::{
             build_batch_from_indices, build_join_schema, check_join_is_valid,
@@ -73,6 +76,10 @@ use datafusion_common::JoinType;
 use datafusion_common::{DataFusionError, Result};
 use datafusion_execution::TaskContext;
 
+use super::hash_join::equal_rows;
+use super::hash_join_utils::SymmetricJoinHashMap;
+use super::utils::apply_join_filter_to_indices;
+
 const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4;
 
 /// A symmetric hash join with range conditions is when both streams are hashed on the
@@ -681,7 +688,7 @@ impl Stream for SymmetricHashJoinStream {
 
 fn prune_hash_values(
     prune_length: usize,
-    hashmap: &mut JoinHashMap,
+    hashmap: &mut SymmetricJoinHashMap,
     row_hash_values: &mut VecDeque<u64>,
     offset: u64,
 ) -> Result<()> {
@@ -1043,7 +1050,7 @@ struct OneSideHashJoiner {
     /// Columns from the side
     on: Vec<Column>,
     /// Hashmap
-    hashmap: JoinHashMap,
+    hashmap: SymmetricJoinHashMap,
     /// To optimize hash deleting in case of pruning, we hold them in memory
     row_hash_values: VecDeque<u64>,
     /// Reuse the hashes buffer
@@ -1076,7 +1083,7 @@ impl OneSideHashJoiner {
             build_side,
             input_buffer: RecordBatch::new_empty(schema),
             on,
-            hashmap: JoinHashMap(RawTable::with_capacity(0)),
+            hashmap: SymmetricJoinHashMap::with_capacity(0),
             row_hash_values: VecDeque::new(),
             hashes_buffer: vec![],
             visited_rows: HashSet::new(),
@@ -1085,6 +1092,39 @@ impl OneSideHashJoiner {
         }
     }
 
+    pub fn update_hash(
+        on: &[Column],
+        batch: &RecordBatch,
+        hash_map: &mut SymmetricJoinHashMap,
+        offset: usize,
+        random_state: &RandomState,
+        hashes_buffer: &mut Vec<u64>,
+    ) -> Result<()> {
+        // evaluate the keys
+        let keys_values = on
+            .iter()
+            .map(|c| Ok(c.evaluate(batch)?.into_array(batch.num_rows())))
+            .collect::<Result<Vec<_>>>()?;
+        // calculate the hash values
+        let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?;
+        // insert hashes to key of the hashmap
+        for (row, hash_value) in hash_values.iter().enumerate() {
+            let item = hash_map
+                .0
+                .get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
+            if let Some((_, indices)) = item {
+                indices.push((row + offset) as u64);
+            } else {
+                hash_map.0.insert(
+                    *hash_value,
+                    (*hash_value, smallvec![(row + offset) as u64]),
+                    |(hash, _)| *hash,
+                );
+            }
+        }
+        Ok(())
+    }
+
     /// Updates the internal state of the [OneSideHashJoiner] with the incoming batch.
     ///
     /// # Arguments
@@ -1106,7 +1146,7 @@ impl OneSideHashJoiner {
         self.hashes_buffer.resize(batch.num_rows(), 0);
         // Get allocation_info before adding the item
         // Update the hashmap with the join key values and hashes of the incoming batch:
-        update_hash(
+        Self::update_hash(
             &self.on,
             batch,
             &mut self.hashmap,
@@ -1119,6 +1159,144 @@ impl OneSideHashJoiner {
         Ok(())
     }
 
+    /// Gets build and probe indices which satisfy the on condition (including
+    /// the equality condition and the join filter) in the join.
+    #[allow(clippy::too_many_arguments)]
+    pub fn build_join_indices(
+        probe_batch: &RecordBatch,
+        build_hashmap: &SymmetricJoinHashMap,
+        build_input_buffer: &RecordBatch,
+        on_build: &[Column],
+        on_probe: &[Column],
+        filter: Option<&JoinFilter>,
+        random_state: &RandomState,
+        null_equals_null: bool,
+        hashes_buffer: &mut Vec<u64>,
+        offset: Option<usize>,
+        build_side: JoinSide,
+    ) -> Result<(UInt64Array, UInt32Array)> {
+        // Get the indices that satisfy the equality condition, like `left.a1 = right.a2`
+        let (build_indices, probe_indices) = Self::build_equal_condition_join_indices(
+            build_hashmap,
+            build_input_buffer,
+            probe_batch,
+            on_build,
+            on_probe,
+            random_state,
+            null_equals_null,
+            hashes_buffer,
+            offset,
+        )?;
+        if let Some(filter) = filter {
+            // Filter the indices which satisfy the non-equal join condition, like `left.b1 = 10`
+            apply_join_filter_to_indices(
+                build_input_buffer,
+                probe_batch,
+                build_indices,
+                probe_indices,
+                filter,
+                build_side,
+            )
+        } else {
+            Ok((build_indices, probe_indices))
+        }
+    }
+
+    // Returns build/probe indices satisfying the equality condition.
+    // On LEFT.b1 = RIGHT.b2
+    // LEFT Table:
+    //  a1  b1  c1
+    //  1   1   10
+    //  3   3   30
+    //  5   5   50
+    //  7   7   70
+    //  9   8   90
+    //  11  8   110
+    // 13   10  130
+    // RIGHT Table:
+    //  a2   b2  c2
+    //  2    2   20
+    //  4    4   40
+    //  6    6   60
+    //  8    8   80
+    // 10   10  100
+    // 12   10  120
+    // The result is
+    // "+----+----+-----+----+----+-----+",
+    // "| a1 | b1 | c1  | a2 | b2 | c2  |",
+    // "+----+----+-----+----+----+-----+",
+    // "| 11 | 8  | 110 | 8  | 8  | 80  |",
+    // "| 13 | 10 | 130 | 10 | 10 | 100 |",
+    // "| 13 | 10 | 130 | 12 | 10 | 120 |",
+    // "| 9  | 8  | 90  | 8  | 8  | 80  |",
+    // "+----+----+-----+----+----+-----+"
+    // And the result of build and probe indices are:
+    // Build indices:  5, 6, 6, 4
+    // Probe indices: 3, 4, 5, 3
+    #[allow(clippy::too_many_arguments)]
+    pub fn build_equal_condition_join_indices(
+        build_hashmap: &SymmetricJoinHashMap,
+        build_input_buffer: &RecordBatch,
+        probe_batch: &RecordBatch,
+        build_on: &[Column],
+        probe_on: &[Column],
+        random_state: &RandomState,
+        null_equals_null: bool,
+        hashes_buffer: &mut Vec<u64>,
+        offset: Option<usize>,
+    ) -> Result<(UInt64Array, UInt32Array)> {
+        let keys_values = probe_on
+            .iter()
+            .map(|c| Ok(c.evaluate(probe_batch)?.into_array(probe_batch.num_rows())))
+            .collect::<Result<Vec<_>>>()?;
+        let build_join_values = build_on
+            .iter()
+            .map(|c| {
+                Ok(c.evaluate(build_input_buffer)?
+                    .into_array(build_input_buffer.num_rows()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+        hashes_buffer.clear();
+        hashes_buffer.resize(probe_batch.num_rows(), 0);
+        let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?;
+        // Using a buffer builder to avoid slower normal builder
+        let mut build_indices = UInt64BufferBuilder::new(0);
+        let mut probe_indices = UInt32BufferBuilder::new(0);
+        let offset_value = offset.unwrap_or(0);
+        // Visit all of the probe rows
+        for (row, hash_value) in hash_values.iter().enumerate() {
+            // Get the hash and find it in the build index
+            // For every item on the build and probe we check if it matches
+            // This possibly contains rows with hash collisions,
+            // So we have to check here whether rows are equal or not
+            if let Some((_, indices)) = build_hashmap
+                .0
+                .get(*hash_value, |(hash, _)| *hash_value == *hash)
+            {
+                for &i in indices {
+                    // Check hash collisions
+                    let offset_build_index = i as usize - offset_value;
+                    // Check hash collisions
+                    if equal_rows(
+                        offset_build_index,
+                        row,
+                        &build_join_values,
+                        &keys_values,
+                        null_equals_null,
+                    )? {
+                        build_indices.append(offset_build_index as u64);
+                        probe_indices.append(row as u32);
+                    }
+                }
+            }
+        }
+
+        Ok((
+            PrimitiveArray::new(build_indices.finish().into(), None),
+            PrimitiveArray::new(probe_indices.finish().into(), None),
+        ))
+    }
+
     /// This method performs a join between the build side input buffer and the probe side batch.
     ///
     /// # Arguments
@@ -1155,7 +1333,7 @@ impl OneSideHashJoiner {
         if self.input_buffer.num_rows() == 0 || probe_batch.num_rows() == 0 {
             return Ok(None);
         }
-        let (build_indices, probe_indices) = build_join_indices(
+        let (build_indices, probe_indices) = Self::build_join_indices(
             probe_batch,
             &self.hashmap,
             &self.input_buffer,