You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/12/30 14:09:04 UTC
(arrow-datafusion) branch main updated: refactor: modified `JoinHashMap` build order for `HashJoinStream` (#8658)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 00a679a053 refactor: modified `JoinHashMap` build order for `HashJoinStream` (#8658)
00a679a053 is described below
commit 00a679a0533f1f878db43c2a9cdcaa2e92ab859e
Author: Eduard Karacharov <13...@users.noreply.github.com>
AuthorDate: Sat Dec 30 16:08:59 2023 +0200
refactor: modified `JoinHashMap` build order for `HashJoinStream` (#8658)
* maintaining fifo hashmap in hash join
* extended HashJoinExec docstring on build phase
* testcases for randomly ordered build side input
* trigger ci
---
datafusion/physical-plan/src/joins/hash_join.rs | 316 ++++++++++++++-------
.../physical-plan/src/joins/symmetric_hash_join.rs | 2 +
datafusion/physical-plan/src/joins/utils.rs | 78 ++++-
3 files changed, 300 insertions(+), 96 deletions(-)
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs
index 13ac06ee30..374a0ad507 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -29,7 +29,6 @@ use crate::joins::utils::{
need_produce_result_in_final, JoinHashMap, JoinHashMapType,
};
use crate::{
- coalesce_batches::concat_batches,
coalesce_partitions::CoalescePartitionsExec,
expressions::Column,
expressions::PhysicalSortExpr,
@@ -52,10 +51,10 @@ use super::{
use arrow::array::{
Array, ArrayRef, BooleanArray, BooleanBufferBuilder, PrimitiveArray, UInt32Array,
- UInt32BufferBuilder, UInt64Array, UInt64BufferBuilder,
+ UInt64Array,
};
use arrow::compute::kernels::cmp::{eq, not_distinct};
-use arrow::compute::{and, take, FilterBuilder};
+use arrow::compute::{and, concat_batches, take, FilterBuilder};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow::util::bit_util;
@@ -156,8 +155,48 @@ impl JoinLeftData {
///
/// Execution proceeds in 2 stages:
///
-/// 1. the **build phase** where a hash table is created from the tuples of the
-/// build side.
+/// 1. the **build phase** creates a hash table from the tuples of the build side,
+/// and single concatenated batch containing data from all fetched record batches.
+/// Resulting hash table stores hashed join-key fields for each row as a key, and
+/// indices of corresponding rows in concatenated batch.
+///
+/// Hash join uses LIFO data structure as a hash table, and in order to retain
+/// original build-side input order while obtaining data during probe phase, hash
+/// table is updated by iterating batch sequence in reverse order -- it allows to
+/// keep rows with smaller indices "on the top" of hash table, and still maintain
+/// correct indexing for concatenated build-side data batch.
+///
+/// Example of build phase for 3 record batches:
+///
+///
+/// ```text
+///
+/// Original build-side data Inserting build-side values into hashmap Concatenated build-side batch
+/// ┌───────────────────────────┐
+/// hasmap.insert(row-hash, row-idx + offset) │ idx │
+/// ┌───────┐ │ ┌───────┐ │
+/// │ Row 1 │ 1) update_hash for batch 3 with offset 0 │ │ Row 6 │ 0 │
+/// Batch 1 │ │ - hashmap.insert(Row 7, idx 1) │ Batch 3 │ │ │
+/// │ Row 2 │ - hashmap.insert(Row 6, idx 0) │ │ Row 7 │ 1 │
+/// └───────┘ │ └───────┘ │
+/// │ │
+/// ┌───────┐ │ ┌───────┐ │
+/// │ Row 3 │ 2) update_hash for batch 2 with offset 2 │ │ Row 3 │ 2 │
+/// │ │ - hashmap.insert(Row 5, idx 4) │ │ │ │
+/// Batch 2 │ Row 4 │ - hashmap.insert(Row 4, idx 3) │ Batch 2 │ Row 4 │ 3 │
+/// │ │ - hashmap.insert(Row 3, idx 2) │ │ │ │
+/// │ Row 5 │ │ │ Row 5 │ 4 │
+/// └───────┘ │ └───────┘ │
+/// │ │
+/// ┌───────┐ │ ┌───────┐ │
+/// │ Row 6 │ 3) update_hash for batch 1 with offset 5 │ │ Row 1 │ 5 │
+/// Batch 3 │ │ - hashmap.insert(Row 2, idx 5) │ Batch 1 │ │ │
+/// │ Row 7 │ - hashmap.insert(Row 1, idx 6) │ │ Row 2 │ 6 │
+/// └───────┘ │ └───────┘ │
+/// │ │
+/// └───────────────────────────┘
+///
+/// ```
///
/// 2. the **probe phase** where the tuples of the probe side are streamed
/// through, checking for matches of the join keys in the hash table.
@@ -715,7 +754,10 @@ async fn collect_left_input(
let mut hashmap = JoinHashMap::with_capacity(num_rows);
let mut hashes_buffer = Vec::new();
let mut offset = 0;
- for batch in batches.iter() {
+
+ // Updating hashmap starting from the last batch
+ let batches_iter = batches.iter().rev();
+ for batch in batches_iter.clone() {
hashes_buffer.clear();
hashes_buffer.resize(batch.num_rows(), 0);
update_hash(
@@ -726,19 +768,25 @@ async fn collect_left_input(
&random_state,
&mut hashes_buffer,
0,
+ true,
)?;
offset += batch.num_rows();
}
// Merge all batches into a single batch, so we
// can directly index into the arrays
- let single_batch = concat_batches(&schema, &batches, num_rows)?;
+ let single_batch = concat_batches(&schema, batches_iter)?;
let data = JoinLeftData::new(hashmap, single_batch, reservation);
Ok(data)
}
-/// Updates `hash` with new entries from [RecordBatch] evaluated against the expressions `on`,
-/// assuming that the [RecordBatch] corresponds to the `index`th
+/// Updates `hash_map` with new entries from `batch` evaluated against the expressions `on`
+/// using `offset` as a start value for `batch` row indices.
+///
+/// `fifo_hashmap` sets the order of iteration over `batch` rows while updating hashmap,
+/// which allows to keep either first (if set to true) or last (if set to false) row index
+/// as a chain head for rows with equal hash values.
+#[allow(clippy::too_many_arguments)]
pub fn update_hash<T>(
on: &[Column],
batch: &RecordBatch,
@@ -747,6 +795,7 @@ pub fn update_hash<T>(
random_state: &RandomState,
hashes_buffer: &mut Vec<u64>,
deleted_offset: usize,
+ fifo_hashmap: bool,
) -> Result<()>
where
T: JoinHashMapType,
@@ -763,28 +812,18 @@ where
// For usual JoinHashmap, the implementation is void.
hash_map.extend_zero(batch.num_rows());
- // insert hashes to key of the hashmap
- let (mut_map, mut_list) = hash_map.get_mut();
- for (row, hash_value) in hash_values.iter().enumerate() {
- let item = mut_map.get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
- if let Some((_, index)) = item {
- // Already exists: add index to next array
- let prev_index = *index;
- // Store new value inside hashmap
- *index = (row + offset + 1) as u64;
- // Update chained Vec at row + offset with previous value
- mut_list[row + offset - deleted_offset] = prev_index;
- } else {
- mut_map.insert(
- *hash_value,
- // store the value + 1 as 0 value reserved for end of list
- (*hash_value, (row + offset + 1) as u64),
- |(hash, _)| *hash,
- );
- // chained list at (row + offset) is already initialized with 0
- // meaning end of list
- }
+ // Updating JoinHashMap from hash values iterator
+ let hash_values_iter = hash_values
+ .iter()
+ .enumerate()
+ .map(|(i, val)| (i + offset, val));
+
+ if fifo_hashmap {
+ hash_map.update_from_iter(hash_values_iter.rev(), deleted_offset);
+ } else {
+ hash_map.update_from_iter(hash_values_iter, deleted_offset);
}
+
Ok(())
}
@@ -987,6 +1026,7 @@ pub fn build_equal_condition_join_indices<T: JoinHashMapType>(
filter: Option<&JoinFilter>,
build_side: JoinSide,
deleted_offset: Option<usize>,
+ fifo_hashmap: bool,
) -> Result<(UInt64Array, UInt32Array)> {
let keys_values = probe_on
.iter()
@@ -1002,10 +1042,9 @@ pub fn build_equal_condition_join_indices<T: JoinHashMapType>(
hashes_buffer.clear();
hashes_buffer.resize(probe_batch.num_rows(), 0);
let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?;
- // Using a buffer builder to avoid slower normal builder
- let mut build_indices = UInt64BufferBuilder::new(0);
- let mut probe_indices = UInt32BufferBuilder::new(0);
- // The chained list algorithm generates build indices for each probe row in a reversed sequence as such:
+
+ // In case build-side input has not been inverted while JoinHashMap creation, the chained list algorithm
+ // will return build indices for each probe row in a reverse order as such:
// Build Indices: [5, 4, 3]
// Probe Indices: [1, 1, 1]
//
@@ -1034,44 +1073,17 @@ pub fn build_equal_condition_join_indices<T: JoinHashMapType>(
// (5,1)
//
// With this approach, the lexicographic order on both the probe side and the build side is preserved.
- let hash_map = build_hashmap.get_map();
- let next_chain = build_hashmap.get_list();
- for (row, hash_value) in hash_values.iter().enumerate().rev() {
- // Get the hash and find it in the build index
-
- // For every item on the build and probe we check if it matches
- // This possibly contains rows with hash collisions,
- // So we have to check here whether rows are equal or not
- if let Some((_, index)) =
- hash_map.get(*hash_value, |(hash, _)| *hash_value == *hash)
- {
- let mut i = *index - 1;
- loop {
- let build_row_value = if let Some(offset) = deleted_offset {
- // This arguments means that we prune the next index way before here.
- if i < offset as u64 {
- // End of the list due to pruning
- break;
- }
- i - offset as u64
- } else {
- i
- };
- build_indices.append(build_row_value);
- probe_indices.append(row as u32);
- // Follow the chain to get the next index value
- let next = next_chain[build_row_value as usize];
- if next == 0 {
- // end of list
- break;
- }
- i = next - 1;
- }
- }
- }
- // Reversing both sets of indices
- build_indices.as_slice_mut().reverse();
- probe_indices.as_slice_mut().reverse();
+ let (mut probe_indices, mut build_indices) = if fifo_hashmap {
+ build_hashmap.get_matched_indices(hash_values.iter().enumerate(), deleted_offset)
+ } else {
+ let (mut matched_probe, mut matched_build) = build_hashmap
+ .get_matched_indices(hash_values.iter().enumerate().rev(), deleted_offset);
+
+ matched_probe.as_slice_mut().reverse();
+ matched_build.as_slice_mut().reverse();
+
+ (matched_probe, matched_build)
+ };
let left: UInt64Array = PrimitiveArray::new(build_indices.finish().into(), None);
let right: UInt32Array = PrimitiveArray::new(probe_indices.finish().into(), None);
@@ -1279,6 +1291,7 @@ impl HashJoinStream {
self.filter.as_ref(),
JoinSide::Left,
None,
+ true,
);
let result = match left_right_indices {
@@ -1393,7 +1406,9 @@ mod tests {
use arrow::array::{ArrayRef, Date32Array, Int32Array, UInt32Builder, UInt64Builder};
use arrow::datatypes::{DataType, Field, Schema};
- use datafusion_common::{assert_batches_sorted_eq, assert_contains, ScalarValue};
+ use datafusion_common::{
+ assert_batches_eq, assert_batches_sorted_eq, assert_contains, ScalarValue,
+ };
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_expr::Operator;
@@ -1558,7 +1573,9 @@ mod tests {
"| 3 | 5 | 9 | 20 | 5 | 80 |",
"+----+----+----+----+----+----+",
];
- assert_batches_sorted_eq!(expected, &batches);
+
+ // Inner join output is expected to preserve both inputs order
+ assert_batches_eq!(expected, &batches);
Ok(())
}
@@ -1640,7 +1657,48 @@ mod tests {
"+----+----+----+----+----+----+",
];
- assert_batches_sorted_eq!(expected, &batches);
+ // Inner join output is expected to preserve both inputs order
+ assert_batches_eq!(expected, &batches);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn join_inner_one_randomly_ordered() -> Result<()> {
+ let task_ctx = Arc::new(TaskContext::default());
+ let left = build_table(
+ ("a1", &vec![0, 3, 2, 1]),
+ ("b1", &vec![4, 5, 5, 4]),
+ ("c1", &vec![6, 9, 8, 7]),
+ );
+ let right = build_table(
+ ("a2", &vec![20, 30, 10]),
+ ("b2", &vec![5, 6, 4]),
+ ("c2", &vec![80, 90, 70]),
+ );
+ let on = vec![(
+ Column::new_with_schema("b1", &left.schema())?,
+ Column::new_with_schema("b2", &right.schema())?,
+ )];
+
+ let (columns, batches) =
+ join_collect(left, right, on, &JoinType::Inner, false, task_ctx).await?;
+
+ assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
+
+ let expected = [
+ "+----+----+----+----+----+----+",
+ "| a1 | b1 | c1 | a2 | b2 | c2 |",
+ "+----+----+----+----+----+----+",
+ "| 3 | 5 | 9 | 20 | 5 | 80 |",
+ "| 2 | 5 | 8 | 20 | 5 | 80 |",
+ "| 0 | 4 | 6 | 10 | 4 | 70 |",
+ "| 1 | 4 | 7 | 10 | 4 | 70 |",
+ "+----+----+----+----+----+----+",
+ ];
+
+ // Inner join output is expected to preserve both inputs order
+ assert_batches_eq!(expected, &batches);
Ok(())
}
@@ -1686,7 +1744,8 @@ mod tests {
"+----+----+----+----+----+----+",
];
- assert_batches_sorted_eq!(expected, &batches);
+ // Inner join output is expected to preserve both inputs order
+ assert_batches_eq!(expected, &batches);
Ok(())
}
@@ -1740,7 +1799,58 @@ mod tests {
"+----+----+----+----+----+----+",
];
- assert_batches_sorted_eq!(expected, &batches);
+ // Inner join output is expected to preserve both inputs order
+ assert_batches_eq!(expected, &batches);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn join_inner_one_two_parts_left_randomly_ordered() -> Result<()> {
+ let task_ctx = Arc::new(TaskContext::default());
+ let batch1 = build_table_i32(
+ ("a1", &vec![0, 3]),
+ ("b1", &vec![4, 5]),
+ ("c1", &vec![6, 9]),
+ );
+ let batch2 = build_table_i32(
+ ("a1", &vec![2, 1]),
+ ("b1", &vec![5, 4]),
+ ("c1", &vec![8, 7]),
+ );
+ let schema = batch1.schema();
+
+ let left = Arc::new(
+ MemoryExec::try_new(&[vec![batch1], vec![batch2]], schema, None).unwrap(),
+ );
+ let right = build_table(
+ ("a2", &vec![20, 30, 10]),
+ ("b2", &vec![5, 6, 4]),
+ ("c2", &vec![80, 90, 70]),
+ );
+ let on = vec![(
+ Column::new_with_schema("b1", &left.schema())?,
+ Column::new_with_schema("b2", &right.schema())?,
+ )];
+
+ let (columns, batches) =
+ join_collect(left, right, on, &JoinType::Inner, false, task_ctx).await?;
+
+ assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
+
+ let expected = [
+ "+----+----+----+----+----+----+",
+ "| a1 | b1 | c1 | a2 | b2 | c2 |",
+ "+----+----+----+----+----+----+",
+ "| 3 | 5 | 9 | 20 | 5 | 80 |",
+ "| 2 | 5 | 8 | 20 | 5 | 80 |",
+ "| 0 | 4 | 6 | 10 | 4 | 70 |",
+ "| 1 | 4 | 7 | 10 | 4 | 70 |",
+ "+----+----+----+----+----+----+",
+ ];
+
+ // Inner join output is expected to preserve both inputs order
+ assert_batches_eq!(expected, &batches);
Ok(())
}
@@ -1789,7 +1899,9 @@ mod tests {
"| 1 | 4 | 7 | 10 | 4 | 70 |",
"+----+----+----+----+----+----+",
];
- assert_batches_sorted_eq!(expected, &batches);
+
+ // Inner join output is expected to preserve both inputs order
+ assert_batches_eq!(expected, &batches);
// second part
let stream = join.execute(1, task_ctx.clone())?;
@@ -1804,7 +1916,8 @@ mod tests {
"+----+----+----+----+----+----+",
];
- assert_batches_sorted_eq!(expected, &batches);
+ // Inner join output is expected to preserve both inputs order
+ assert_batches_eq!(expected, &batches);
Ok(())
}
@@ -2228,12 +2341,14 @@ mod tests {
"+----+----+-----+",
"| a2 | b2 | c2 |",
"+----+----+-----+",
- "| 10 | 10 | 100 |",
- "| 12 | 10 | 40 |",
"| 8 | 8 | 20 |",
+ "| 12 | 10 | 40 |",
+ "| 10 | 10 | 100 |",
"+----+----+-----+",
];
- assert_batches_sorted_eq!(expected, &batches);
+
+ // RightSemi join output is expected to preserve right input order
+ assert_batches_eq!(expected, &batches);
Ok(())
}
@@ -2288,12 +2403,14 @@ mod tests {
"+----+----+-----+",
"| a2 | b2 | c2 |",
"+----+----+-----+",
- "| 10 | 10 | 100 |",
- "| 12 | 10 | 40 |",
"| 8 | 8 | 20 |",
+ "| 12 | 10 | 40 |",
+ "| 10 | 10 | 100 |",
"+----+----+-----+",
];
- assert_batches_sorted_eq!(expected, &batches);
+
+ // RightSemi join output is expected to preserve right input order
+ assert_batches_eq!(expected, &batches);
// left_table right semi join right_table on left_table.b1 = right_table.b2 on left_table.a1!=9
let filter_expression = Arc::new(BinaryExpr::new(
@@ -2314,11 +2431,13 @@ mod tests {
"+----+----+-----+",
"| a2 | b2 | c2 |",
"+----+----+-----+",
- "| 10 | 10 | 100 |",
"| 12 | 10 | 40 |",
+ "| 10 | 10 | 100 |",
"+----+----+-----+",
];
- assert_batches_sorted_eq!(expected, &batches);
+
+ // RightSemi join output is expected to preserve right input order
+ assert_batches_eq!(expected, &batches);
Ok(())
}
@@ -2471,12 +2590,14 @@ mod tests {
"+----+----+-----+",
"| a2 | b2 | c2 |",
"+----+----+-----+",
+ "| 6 | 6 | 60 |",
"| 2 | 2 | 80 |",
"| 4 | 4 | 120 |",
- "| 6 | 6 | 60 |",
"+----+----+-----+",
];
- assert_batches_sorted_eq!(expected, &batches);
+
+ // RightAnti join output is expected to preserve right input order
+ assert_batches_eq!(expected, &batches);
Ok(())
}
@@ -2529,14 +2650,16 @@ mod tests {
"+----+----+-----+",
"| a2 | b2 | c2 |",
"+----+----+-----+",
- "| 10 | 10 | 100 |",
"| 12 | 10 | 40 |",
+ "| 6 | 6 | 60 |",
"| 2 | 2 | 80 |",
+ "| 10 | 10 | 100 |",
"| 4 | 4 | 120 |",
- "| 6 | 6 | 60 |",
"+----+----+-----+",
];
- assert_batches_sorted_eq!(expected, &batches);
+
+ // RightAnti join output is expected to preserve right input order
+ assert_batches_eq!(expected, &batches);
// left_table right anti join right_table on left_table.b1 = right_table.b2 and right_table.b2!=8
let column_indices = vec![ColumnIndex {
@@ -2565,13 +2688,15 @@ mod tests {
"+----+----+-----+",
"| a2 | b2 | c2 |",
"+----+----+-----+",
+ "| 8 | 8 | 20 |",
+ "| 6 | 6 | 60 |",
"| 2 | 2 | 80 |",
"| 4 | 4 | 120 |",
- "| 6 | 6 | 60 |",
- "| 8 | 8 | 20 |",
"+----+----+-----+",
];
- assert_batches_sorted_eq!(expected, &batches);
+
+ // RightAnti join output is expected to preserve right input order
+ assert_batches_eq!(expected, &batches);
Ok(())
}
@@ -2734,6 +2859,7 @@ mod tests {
None,
JoinSide::Left,
None,
+ false,
)?;
let mut left_ids = UInt64Builder::with_capacity(0);
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index f071a7f601..2d38c2bd16 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -771,6 +771,7 @@ pub(crate) fn join_with_probe_batch(
filter,
build_hash_joiner.build_side,
Some(build_hash_joiner.deleted_offset),
+ false,
)?;
if need_to_produce_result_in_final(build_hash_joiner.build_side, join_type) {
record_visited_indices(
@@ -883,6 +884,7 @@ impl OneSideHashJoiner {
random_state,
&mut self.hashes_buffer,
self.deleted_offset,
+ false,
)?;
Ok(())
}
diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs
index ac805b50e6..1e3cf5abb4 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -30,7 +30,7 @@ use crate::{ColumnStatistics, ExecutionPlan, Partitioning, Statistics};
use arrow::array::{
downcast_array, new_null_array, Array, BooleanBufferBuilder, UInt32Array,
- UInt32Builder, UInt64Array,
+ UInt32BufferBuilder, UInt32Builder, UInt64Array, UInt64BufferBuilder,
};
use arrow::compute;
use arrow::datatypes::{Field, Schema, SchemaBuilder};
@@ -148,6 +148,82 @@ pub trait JoinHashMapType {
fn get_map(&self) -> &RawTable<(u64, u64)>;
/// Returns a reference to the next.
fn get_list(&self) -> &Self::NextType;
+
+ /// Updates hashmap from iterator of row indices & row hashes pairs.
+ fn update_from_iter<'a>(
+ &mut self,
+ iter: impl Iterator<Item = (usize, &'a u64)>,
+ deleted_offset: usize,
+ ) {
+ let (mut_map, mut_list) = self.get_mut();
+ for (row, hash_value) in iter {
+ let item = mut_map.get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
+ if let Some((_, index)) = item {
+ // Already exists: add index to next array
+ let prev_index = *index;
+ // Store new value inside hashmap
+ *index = (row + 1) as u64;
+ // Update chained Vec at `row` with previous value
+ mut_list[row - deleted_offset] = prev_index;
+ } else {
+ mut_map.insert(
+ *hash_value,
+ // store the value + 1 as 0 value reserved for end of list
+ (*hash_value, (row + 1) as u64),
+ |(hash, _)| *hash,
+ );
+ // chained list at `row` is already initialized with 0
+ // meaning end of list
+ }
+ }
+ }
+
+ /// Returns all pairs of row indices matched by hash.
+ ///
+ /// This method only compares hashes, so additional further check for actual values
+ /// equality may be required.
+ fn get_matched_indices<'a>(
+ &self,
+ iter: impl Iterator<Item = (usize, &'a u64)>,
+ deleted_offset: Option<usize>,
+ ) -> (UInt32BufferBuilder, UInt64BufferBuilder) {
+ let mut input_indices = UInt32BufferBuilder::new(0);
+ let mut match_indices = UInt64BufferBuilder::new(0);
+
+ let hash_map = self.get_map();
+ let next_chain = self.get_list();
+ for (row_idx, hash_value) in iter {
+ // Get the hash and find it in the index
+ if let Some((_, index)) =
+ hash_map.get(*hash_value, |(hash, _)| *hash_value == *hash)
+ {
+ let mut i = *index - 1;
+ loop {
+ let match_row_idx = if let Some(offset) = deleted_offset {
+ // This arguments means that we prune the next index way before here.
+ if i < offset as u64 {
+ // End of the list due to pruning
+ break;
+ }
+ i - offset as u64
+ } else {
+ i
+ };
+ match_indices.append(match_row_idx);
+ input_indices.append(row_idx as u32);
+ // Follow the chain to get the next index value
+ let next = next_chain[match_row_idx as usize];
+ if next == 0 {
+ // end of list
+ break;
+ }
+ i = next - 1;
+ }
+ }
+ }
+
+ (input_indices, match_indices)
+ }
}
/// Implementation of `JoinHashMapType` for `JoinHashMap`.