You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/12/30 14:09:04 UTC

(arrow-datafusion) branch main updated: refactor: modified `JoinHashMap` build order for `HashJoinStream` (#8658)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 00a679a053 refactor: modified `JoinHashMap` build order for `HashJoinStream` (#8658)
00a679a053 is described below

commit 00a679a0533f1f878db43c2a9cdcaa2e92ab859e
Author: Eduard Karacharov <13...@users.noreply.github.com>
AuthorDate: Sat Dec 30 16:08:59 2023 +0200

    refactor: modified `JoinHashMap` build order for `HashJoinStream` (#8658)
    
    * maintaining fifo hashmap in hash join
    
    * extended HashJoinExec docstring on build phase
    
    * testcases for randomly ordered build side input
    
    * trigger ci
---
 datafusion/physical-plan/src/joins/hash_join.rs    | 316 ++++++++++++++-------
 .../physical-plan/src/joins/symmetric_hash_join.rs |   2 +
 datafusion/physical-plan/src/joins/utils.rs        |  78 ++++-
 3 files changed, 300 insertions(+), 96 deletions(-)

diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs
index 13ac06ee30..374a0ad507 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -29,7 +29,6 @@ use crate::joins::utils::{
     need_produce_result_in_final, JoinHashMap, JoinHashMapType,
 };
 use crate::{
-    coalesce_batches::concat_batches,
     coalesce_partitions::CoalescePartitionsExec,
     expressions::Column,
     expressions::PhysicalSortExpr,
@@ -52,10 +51,10 @@ use super::{
 
 use arrow::array::{
     Array, ArrayRef, BooleanArray, BooleanBufferBuilder, PrimitiveArray, UInt32Array,
-    UInt32BufferBuilder, UInt64Array, UInt64BufferBuilder,
+    UInt64Array,
 };
 use arrow::compute::kernels::cmp::{eq, not_distinct};
-use arrow::compute::{and, take, FilterBuilder};
+use arrow::compute::{and, concat_batches, take, FilterBuilder};
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
 use arrow::util::bit_util;
@@ -156,8 +155,48 @@ impl JoinLeftData {
 ///
 /// Execution proceeds in 2 stages:
 ///
-/// 1. the **build phase** where a hash table is created from the tuples of the
-/// build side.
+/// 1. the **build phase** creates a hash table from the tuples of the build side,
+/// and single concatenated batch containing data from all fetched record batches.
+/// Resulting hash table stores hashed join-key fields for each row as a key, and
+/// indices of corresponding rows in concatenated batch.
+///
+/// Hash join uses LIFO data structure as a hash table, and in order to retain
+/// original build-side input order while obtaining data during probe phase, hash
+/// table is updated by iterating batch sequence in reverse order -- it allows to
+/// keep rows with smaller indices "on the top" of hash table, and still maintain
+/// correct indexing for concatenated build-side data batch.
+///
+/// Example of build phase for 3 record batches:
+///
+///
+/// ```text
+///
+///  Original build-side data   Inserting build-side values into hashmap    Concatenated build-side batch
+///                                                                         ┌───────────────────────────┐
+///                             hasmap.insert(row-hash, row-idx + offset)   │                      idx  │
+///            ┌───────┐                                                    │          ┌───────┐        │
+///            │ Row 1 │        1) update_hash for batch 3 with offset 0    │          │ Row 6 │    0   │
+///   Batch 1  │       │           - hashmap.insert(Row 7, idx 1)           │ Batch 3  │       │        │
+///            │ Row 2 │           - hashmap.insert(Row 6, idx 0)           │          │ Row 7 │    1   │
+///            └───────┘                                                    │          └───────┘        │
+///                                                                         │                           │
+///            ┌───────┐                                                    │          ┌───────┐        │
+///            │ Row 3 │        2) update_hash for batch 2 with offset 2    │          │ Row 3 │    2   │
+///            │       │           - hashmap.insert(Row 5, idx 4)           │          │       │        │
+///   Batch 2  │ Row 4 │           - hashmap.insert(Row 4, idx 3)           │ Batch 2  │ Row 4 │    3   │
+///            │       │           - hashmap.insert(Row 3, idx 2)           │          │       │        │
+///            │ Row 5 │                                                    │          │ Row 5 │    4   │
+///            └───────┘                                                    │          └───────┘        │
+///                                                                         │                           │
+///            ┌───────┐                                                    │          ┌───────┐        │
+///            │ Row 6 │        3) update_hash for batch 1 with offset 5    │          │ Row 1 │    5   │
+///   Batch 3  │       │           - hashmap.insert(Row 2, idx 5)           │ Batch 1  │       │        │
+///            │ Row 7 │           - hashmap.insert(Row 1, idx 6)           │          │ Row 2 │    6   │
+///            └───────┘                                                    │          └───────┘        │
+///                                                                         │                           │
+///                                                                         └───────────────────────────┘
+///
+/// ```
 ///
 /// 2. the **probe phase** where the tuples of the probe side are streamed
 /// through, checking for matches of the join keys in the hash table.
@@ -715,7 +754,10 @@ async fn collect_left_input(
     let mut hashmap = JoinHashMap::with_capacity(num_rows);
     let mut hashes_buffer = Vec::new();
     let mut offset = 0;
-    for batch in batches.iter() {
+
+    // Updating hashmap starting from the last batch
+    let batches_iter = batches.iter().rev();
+    for batch in batches_iter.clone() {
         hashes_buffer.clear();
         hashes_buffer.resize(batch.num_rows(), 0);
         update_hash(
@@ -726,19 +768,25 @@ async fn collect_left_input(
             &random_state,
             &mut hashes_buffer,
             0,
+            true,
         )?;
         offset += batch.num_rows();
     }
     // Merge all batches into a single batch, so we
     // can directly index into the arrays
-    let single_batch = concat_batches(&schema, &batches, num_rows)?;
+    let single_batch = concat_batches(&schema, batches_iter)?;
     let data = JoinLeftData::new(hashmap, single_batch, reservation);
 
     Ok(data)
 }
 
-/// Updates `hash` with new entries from [RecordBatch] evaluated against the expressions `on`,
-/// assuming that the [RecordBatch] corresponds to the `index`th
+/// Updates `hash_map` with new entries from `batch` evaluated against the expressions `on`
+/// using `offset` as a start value for `batch` row indices.
+///
+/// `fifo_hashmap` sets the order of iteration over `batch` rows while updating hashmap,
+/// which allows to keep either first (if set to true) or last (if set to false) row index
+/// as a chain head for rows with equal hash values.
+#[allow(clippy::too_many_arguments)]
 pub fn update_hash<T>(
     on: &[Column],
     batch: &RecordBatch,
@@ -747,6 +795,7 @@ pub fn update_hash<T>(
     random_state: &RandomState,
     hashes_buffer: &mut Vec<u64>,
     deleted_offset: usize,
+    fifo_hashmap: bool,
 ) -> Result<()>
 where
     T: JoinHashMapType,
@@ -763,28 +812,18 @@ where
     // For usual JoinHashmap, the implementation is void.
     hash_map.extend_zero(batch.num_rows());
 
-    // insert hashes to key of the hashmap
-    let (mut_map, mut_list) = hash_map.get_mut();
-    for (row, hash_value) in hash_values.iter().enumerate() {
-        let item = mut_map.get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
-        if let Some((_, index)) = item {
-            // Already exists: add index to next array
-            let prev_index = *index;
-            // Store new value inside hashmap
-            *index = (row + offset + 1) as u64;
-            // Update chained Vec at row + offset with previous value
-            mut_list[row + offset - deleted_offset] = prev_index;
-        } else {
-            mut_map.insert(
-                *hash_value,
-                // store the value + 1 as 0 value reserved for end of list
-                (*hash_value, (row + offset + 1) as u64),
-                |(hash, _)| *hash,
-            );
-            // chained list at (row + offset) is already initialized with 0
-            // meaning end of list
-        }
+    // Updating JoinHashMap from hash values iterator
+    let hash_values_iter = hash_values
+        .iter()
+        .enumerate()
+        .map(|(i, val)| (i + offset, val));
+
+    if fifo_hashmap {
+        hash_map.update_from_iter(hash_values_iter.rev(), deleted_offset);
+    } else {
+        hash_map.update_from_iter(hash_values_iter, deleted_offset);
     }
+
     Ok(())
 }
 
@@ -987,6 +1026,7 @@ pub fn build_equal_condition_join_indices<T: JoinHashMapType>(
     filter: Option<&JoinFilter>,
     build_side: JoinSide,
     deleted_offset: Option<usize>,
+    fifo_hashmap: bool,
 ) -> Result<(UInt64Array, UInt32Array)> {
     let keys_values = probe_on
         .iter()
@@ -1002,10 +1042,9 @@ pub fn build_equal_condition_join_indices<T: JoinHashMapType>(
     hashes_buffer.clear();
     hashes_buffer.resize(probe_batch.num_rows(), 0);
     let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?;
-    // Using a buffer builder to avoid slower normal builder
-    let mut build_indices = UInt64BufferBuilder::new(0);
-    let mut probe_indices = UInt32BufferBuilder::new(0);
-    // The chained list algorithm generates build indices for each probe row in a reversed sequence as such:
+
+    // In case build-side input has not been inverted while JoinHashMap creation, the chained list algorithm
+    // will return build indices for each probe row in a reverse order as such:
     // Build Indices: [5, 4, 3]
     // Probe Indices: [1, 1, 1]
     //
@@ -1034,44 +1073,17 @@ pub fn build_equal_condition_join_indices<T: JoinHashMapType>(
     //     (5,1)
     //
     // With this approach, the lexicographic order on both the probe side and the build side is preserved.
-    let hash_map = build_hashmap.get_map();
-    let next_chain = build_hashmap.get_list();
-    for (row, hash_value) in hash_values.iter().enumerate().rev() {
-        // Get the hash and find it in the build index
-
-        // For every item on the build and probe we check if it matches
-        // This possibly contains rows with hash collisions,
-        // So we have to check here whether rows are equal or not
-        if let Some((_, index)) =
-            hash_map.get(*hash_value, |(hash, _)| *hash_value == *hash)
-        {
-            let mut i = *index - 1;
-            loop {
-                let build_row_value = if let Some(offset) = deleted_offset {
-                    // This arguments means that we prune the next index way before here.
-                    if i < offset as u64 {
-                        // End of the list due to pruning
-                        break;
-                    }
-                    i - offset as u64
-                } else {
-                    i
-                };
-                build_indices.append(build_row_value);
-                probe_indices.append(row as u32);
-                // Follow the chain to get the next index value
-                let next = next_chain[build_row_value as usize];
-                if next == 0 {
-                    // end of list
-                    break;
-                }
-                i = next - 1;
-            }
-        }
-    }
-    // Reversing both sets of indices
-    build_indices.as_slice_mut().reverse();
-    probe_indices.as_slice_mut().reverse();
+    let (mut probe_indices, mut build_indices) = if fifo_hashmap {
+        build_hashmap.get_matched_indices(hash_values.iter().enumerate(), deleted_offset)
+    } else {
+        let (mut matched_probe, mut matched_build) = build_hashmap
+            .get_matched_indices(hash_values.iter().enumerate().rev(), deleted_offset);
+
+        matched_probe.as_slice_mut().reverse();
+        matched_build.as_slice_mut().reverse();
+
+        (matched_probe, matched_build)
+    };
 
     let left: UInt64Array = PrimitiveArray::new(build_indices.finish().into(), None);
     let right: UInt32Array = PrimitiveArray::new(probe_indices.finish().into(), None);
@@ -1279,6 +1291,7 @@ impl HashJoinStream {
             self.filter.as_ref(),
             JoinSide::Left,
             None,
+            true,
         );
 
         let result = match left_right_indices {
@@ -1393,7 +1406,9 @@ mod tests {
 
     use arrow::array::{ArrayRef, Date32Array, Int32Array, UInt32Builder, UInt64Builder};
     use arrow::datatypes::{DataType, Field, Schema};
-    use datafusion_common::{assert_batches_sorted_eq, assert_contains, ScalarValue};
+    use datafusion_common::{
+        assert_batches_eq, assert_batches_sorted_eq, assert_contains, ScalarValue,
+    };
     use datafusion_execution::config::SessionConfig;
     use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
     use datafusion_expr::Operator;
@@ -1558,7 +1573,9 @@ mod tests {
             "| 3  | 5  | 9  | 20 | 5  | 80 |",
             "+----+----+----+----+----+----+",
         ];
-        assert_batches_sorted_eq!(expected, &batches);
+
+        // Inner join output is expected to preserve both inputs order
+        assert_batches_eq!(expected, &batches);
 
         Ok(())
     }
@@ -1640,7 +1657,48 @@ mod tests {
             "+----+----+----+----+----+----+",
         ];
 
-        assert_batches_sorted_eq!(expected, &batches);
+        // Inner join output is expected to preserve both inputs order
+        assert_batches_eq!(expected, &batches);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn join_inner_one_randomly_ordered() -> Result<()> {
+        let task_ctx = Arc::new(TaskContext::default());
+        let left = build_table(
+            ("a1", &vec![0, 3, 2, 1]),
+            ("b1", &vec![4, 5, 5, 4]),
+            ("c1", &vec![6, 9, 8, 7]),
+        );
+        let right = build_table(
+            ("a2", &vec![20, 30, 10]),
+            ("b2", &vec![5, 6, 4]),
+            ("c2", &vec![80, 90, 70]),
+        );
+        let on = vec![(
+            Column::new_with_schema("b1", &left.schema())?,
+            Column::new_with_schema("b2", &right.schema())?,
+        )];
+
+        let (columns, batches) =
+            join_collect(left, right, on, &JoinType::Inner, false, task_ctx).await?;
+
+        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
+
+        let expected = [
+            "+----+----+----+----+----+----+",
+            "| a1 | b1 | c1 | a2 | b2 | c2 |",
+            "+----+----+----+----+----+----+",
+            "| 3  | 5  | 9  | 20 | 5  | 80 |",
+            "| 2  | 5  | 8  | 20 | 5  | 80 |",
+            "| 0  | 4  | 6  | 10 | 4  | 70 |",
+            "| 1  | 4  | 7  | 10 | 4  | 70 |",
+            "+----+----+----+----+----+----+",
+        ];
+
+        // Inner join output is expected to preserve both inputs order
+        assert_batches_eq!(expected, &batches);
 
         Ok(())
     }
@@ -1686,7 +1744,8 @@ mod tests {
             "+----+----+----+----+----+----+",
         ];
 
-        assert_batches_sorted_eq!(expected, &batches);
+        // Inner join output is expected to preserve both inputs order
+        assert_batches_eq!(expected, &batches);
 
         Ok(())
     }
@@ -1740,7 +1799,58 @@ mod tests {
             "+----+----+----+----+----+----+",
         ];
 
-        assert_batches_sorted_eq!(expected, &batches);
+        // Inner join output is expected to preserve both inputs order
+        assert_batches_eq!(expected, &batches);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn join_inner_one_two_parts_left_randomly_ordered() -> Result<()> {
+        let task_ctx = Arc::new(TaskContext::default());
+        let batch1 = build_table_i32(
+            ("a1", &vec![0, 3]),
+            ("b1", &vec![4, 5]),
+            ("c1", &vec![6, 9]),
+        );
+        let batch2 = build_table_i32(
+            ("a1", &vec![2, 1]),
+            ("b1", &vec![5, 4]),
+            ("c1", &vec![8, 7]),
+        );
+        let schema = batch1.schema();
+
+        let left = Arc::new(
+            MemoryExec::try_new(&[vec![batch1], vec![batch2]], schema, None).unwrap(),
+        );
+        let right = build_table(
+            ("a2", &vec![20, 30, 10]),
+            ("b2", &vec![5, 6, 4]),
+            ("c2", &vec![80, 90, 70]),
+        );
+        let on = vec![(
+            Column::new_with_schema("b1", &left.schema())?,
+            Column::new_with_schema("b2", &right.schema())?,
+        )];
+
+        let (columns, batches) =
+            join_collect(left, right, on, &JoinType::Inner, false, task_ctx).await?;
+
+        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
+
+        let expected = [
+            "+----+----+----+----+----+----+",
+            "| a1 | b1 | c1 | a2 | b2 | c2 |",
+            "+----+----+----+----+----+----+",
+            "| 3  | 5  | 9  | 20 | 5  | 80 |",
+            "| 2  | 5  | 8  | 20 | 5  | 80 |",
+            "| 0  | 4  | 6  | 10 | 4  | 70 |",
+            "| 1  | 4  | 7  | 10 | 4  | 70 |",
+            "+----+----+----+----+----+----+",
+        ];
+
+        // Inner join output is expected to preserve both inputs order
+        assert_batches_eq!(expected, &batches);
 
         Ok(())
     }
@@ -1789,7 +1899,9 @@ mod tests {
             "| 1  | 4  | 7  | 10 | 4  | 70 |",
             "+----+----+----+----+----+----+",
         ];
-        assert_batches_sorted_eq!(expected, &batches);
+
+        // Inner join output is expected to preserve both inputs order
+        assert_batches_eq!(expected, &batches);
 
         // second part
         let stream = join.execute(1, task_ctx.clone())?;
@@ -1804,7 +1916,8 @@ mod tests {
             "+----+----+----+----+----+----+",
         ];
 
-        assert_batches_sorted_eq!(expected, &batches);
+        // Inner join output is expected to preserve both inputs order
+        assert_batches_eq!(expected, &batches);
 
         Ok(())
     }
@@ -2228,12 +2341,14 @@ mod tests {
             "+----+----+-----+",
             "| a2 | b2 | c2  |",
             "+----+----+-----+",
-            "| 10 | 10 | 100 |",
-            "| 12 | 10 | 40  |",
             "| 8  | 8  | 20  |",
+            "| 12 | 10 | 40  |",
+            "| 10 | 10 | 100 |",
             "+----+----+-----+",
         ];
-        assert_batches_sorted_eq!(expected, &batches);
+
+        // RightSemi join output is expected to preserve right input order
+        assert_batches_eq!(expected, &batches);
 
         Ok(())
     }
@@ -2288,12 +2403,14 @@ mod tests {
             "+----+----+-----+",
             "| a2 | b2 | c2  |",
             "+----+----+-----+",
-            "| 10 | 10 | 100 |",
-            "| 12 | 10 | 40  |",
             "| 8  | 8  | 20  |",
+            "| 12 | 10 | 40  |",
+            "| 10 | 10 | 100 |",
             "+----+----+-----+",
         ];
-        assert_batches_sorted_eq!(expected, &batches);
+
+        // RightSemi join output is expected to preserve right input order
+        assert_batches_eq!(expected, &batches);
 
         // left_table right semi join right_table on left_table.b1 = right_table.b2 on left_table.a1!=9
         let filter_expression = Arc::new(BinaryExpr::new(
@@ -2314,11 +2431,13 @@ mod tests {
             "+----+----+-----+",
             "| a2 | b2 | c2  |",
             "+----+----+-----+",
-            "| 10 | 10 | 100 |",
             "| 12 | 10 | 40  |",
+            "| 10 | 10 | 100 |",
             "+----+----+-----+",
         ];
-        assert_batches_sorted_eq!(expected, &batches);
+
+        // RightSemi join output is expected to preserve right input order
+        assert_batches_eq!(expected, &batches);
 
         Ok(())
     }
@@ -2471,12 +2590,14 @@ mod tests {
             "+----+----+-----+",
             "| a2 | b2 | c2  |",
             "+----+----+-----+",
+            "| 6  | 6  | 60  |",
             "| 2  | 2  | 80  |",
             "| 4  | 4  | 120 |",
-            "| 6  | 6  | 60  |",
             "+----+----+-----+",
         ];
-        assert_batches_sorted_eq!(expected, &batches);
+
+        // RightAnti join output is expected to preserve right input order
+        assert_batches_eq!(expected, &batches);
         Ok(())
     }
 
@@ -2529,14 +2650,16 @@ mod tests {
             "+----+----+-----+",
             "| a2 | b2 | c2  |",
             "+----+----+-----+",
-            "| 10 | 10 | 100 |",
             "| 12 | 10 | 40  |",
+            "| 6  | 6  | 60  |",
             "| 2  | 2  | 80  |",
+            "| 10 | 10 | 100 |",
             "| 4  | 4  | 120 |",
-            "| 6  | 6  | 60  |",
             "+----+----+-----+",
         ];
-        assert_batches_sorted_eq!(expected, &batches);
+
+        // RightAnti join output is expected to preserve right input order
+        assert_batches_eq!(expected, &batches);
 
         // left_table right anti join right_table on left_table.b1 = right_table.b2 and right_table.b2!=8
         let column_indices = vec![ColumnIndex {
@@ -2565,13 +2688,15 @@ mod tests {
             "+----+----+-----+",
             "| a2 | b2 | c2  |",
             "+----+----+-----+",
+            "| 8  | 8  | 20  |",
+            "| 6  | 6  | 60  |",
             "| 2  | 2  | 80  |",
             "| 4  | 4  | 120 |",
-            "| 6  | 6  | 60  |",
-            "| 8  | 8  | 20  |",
             "+----+----+-----+",
         ];
-        assert_batches_sorted_eq!(expected, &batches);
+
+        // RightAnti join output is expected to preserve right input order
+        assert_batches_eq!(expected, &batches);
 
         Ok(())
     }
@@ -2734,6 +2859,7 @@ mod tests {
             None,
             JoinSide::Left,
             None,
+            false,
         )?;
 
         let mut left_ids = UInt64Builder::with_capacity(0);
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index f071a7f601..2d38c2bd16 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -771,6 +771,7 @@ pub(crate) fn join_with_probe_batch(
         filter,
         build_hash_joiner.build_side,
         Some(build_hash_joiner.deleted_offset),
+        false,
     )?;
     if need_to_produce_result_in_final(build_hash_joiner.build_side, join_type) {
         record_visited_indices(
@@ -883,6 +884,7 @@ impl OneSideHashJoiner {
             random_state,
             &mut self.hashes_buffer,
             self.deleted_offset,
+            false,
         )?;
         Ok(())
     }
diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs
index ac805b50e6..1e3cf5abb4 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -30,7 +30,7 @@ use crate::{ColumnStatistics, ExecutionPlan, Partitioning, Statistics};
 
 use arrow::array::{
     downcast_array, new_null_array, Array, BooleanBufferBuilder, UInt32Array,
-    UInt32Builder, UInt64Array,
+    UInt32BufferBuilder, UInt32Builder, UInt64Array, UInt64BufferBuilder,
 };
 use arrow::compute;
 use arrow::datatypes::{Field, Schema, SchemaBuilder};
@@ -148,6 +148,82 @@ pub trait JoinHashMapType {
     fn get_map(&self) -> &RawTable<(u64, u64)>;
     /// Returns a reference to the next.
     fn get_list(&self) -> &Self::NextType;
+
+    /// Updates hashmap from iterator of row indices & row hashes pairs.
+    fn update_from_iter<'a>(
+        &mut self,
+        iter: impl Iterator<Item = (usize, &'a u64)>,
+        deleted_offset: usize,
+    ) {
+        let (mut_map, mut_list) = self.get_mut();
+        for (row, hash_value) in iter {
+            let item = mut_map.get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
+            if let Some((_, index)) = item {
+                // Already exists: add index to next array
+                let prev_index = *index;
+                // Store new value inside hashmap
+                *index = (row + 1) as u64;
+                // Update chained Vec at `row` with previous value
+                mut_list[row - deleted_offset] = prev_index;
+            } else {
+                mut_map.insert(
+                    *hash_value,
+                    // store the value + 1 as 0 value reserved for end of list
+                    (*hash_value, (row + 1) as u64),
+                    |(hash, _)| *hash,
+                );
+                // chained list at `row` is already initialized with 0
+                // meaning end of list
+            }
+        }
+    }
+
+    /// Returns all pairs of row indices matched by hash.
+    ///
+    /// This method only compares hashes, so additional further check for actual values
+    /// equality may be required.
+    fn get_matched_indices<'a>(
+        &self,
+        iter: impl Iterator<Item = (usize, &'a u64)>,
+        deleted_offset: Option<usize>,
+    ) -> (UInt32BufferBuilder, UInt64BufferBuilder) {
+        let mut input_indices = UInt32BufferBuilder::new(0);
+        let mut match_indices = UInt64BufferBuilder::new(0);
+
+        let hash_map = self.get_map();
+        let next_chain = self.get_list();
+        for (row_idx, hash_value) in iter {
+            // Get the hash and find it in the index
+            if let Some((_, index)) =
+                hash_map.get(*hash_value, |(hash, _)| *hash_value == *hash)
+            {
+                let mut i = *index - 1;
+                loop {
+                    let match_row_idx = if let Some(offset) = deleted_offset {
+                        // This arguments means that we prune the next index way before here.
+                        if i < offset as u64 {
+                            // End of the list due to pruning
+                            break;
+                        }
+                        i - offset as u64
+                    } else {
+                        i
+                    };
+                    match_indices.append(match_row_idx);
+                    input_indices.append(row_idx as u32);
+                    // Follow the chain to get the next index value
+                    let next = next_chain[match_row_idx as usize];
+                    if next == 0 {
+                        // end of list
+                        break;
+                    }
+                    i = next - 1;
+                }
+            }
+        }
+
+        (input_indices, match_indices)
+    }
 }
 
 /// Implementation of `JoinHashMapType` for `JoinHashMap`.