You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2023/06/16 13:15:29 UTC

[arrow-datafusion] branch adapt_datastructure updated: Cleanup, fix symmetric hash join

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch adapt_datastructure
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/adapt_datastructure by this push:
     new 23f3acdbc6 Cleanup, fix symmetric hash join
23f3acdbc6 is described below

commit 23f3acdbc6f082e04484013927583746f37d10d9
Author: Daniƫl Heres <da...@coralogix.com>
AuthorDate: Fri Jun 16 15:15:23 2023 +0200

    Cleanup, fix symmetric hash join
---
 .../core/src/physical_plan/joins/hash_join.rs      |  17 +-
 .../src/physical_plan/joins/hash_join_utils.rs     |  36 ++--
 .../src/physical_plan/joins/symmetric_hash_join.rs | 220 ++++++++++++++++++---
 3 files changed, 219 insertions(+), 54 deletions(-)

diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs
index 4ad71665ec..a2ced794cb 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -568,8 +568,7 @@ pub fn update_hash(
             let prev_index = *index;
             *index = (row + offset) as u64;
             // update chained Vec
-            hash_map.1[*index as usize] = prev_index;
-
+            hash_map.1[(*index - 1) as usize] = prev_index;
         } else {
             hash_map.0.insert(
                 *hash_value,
@@ -735,9 +734,9 @@ pub fn build_equal_condition_join_indices(
             .0
             .get(*hash_value, |(hash, _)| *hash_value == *hash)
         {
-            let mut i = *index;
+            let mut i = *index - 1;
             loop {
-                let offset_build_index = i as usize - offset_value - 1;
+                let offset_build_index = i as usize - offset_value;
                 // Check hash collisions
                 if equal_rows(
                     offset_build_index,
@@ -749,11 +748,11 @@ pub fn build_equal_condition_join_indices(
                     build_indices.append(offset_build_index as u64);
                     probe_indices.append(row as u32);
                 }
-                if build_hashmap.1[i as usize] != 0 {
-                    i = build_hashmap.1[i as usize];
-                } else {
+                if build_hashmap.1[i as usize] == 0 {
+                    // end of list
                     break;
                 }
+                i = build_hashmap.1[i as usize] - 1;
             }
         }
     }
@@ -831,7 +830,7 @@ macro_rules! equal_rows_elem_with_string_dict {
 /// Left and right row have equal values
 /// If more data types are supported here, please also add the data types in can_hash function
 /// to generate hash join logical plan.
-fn equal_rows(
+pub fn equal_rows(
     left: usize,
     right: usize,
     left_arrays: &[ArrayRef],
@@ -2620,7 +2619,7 @@ mod tests {
         hashmap_left.insert(hashes[0], (hashes[0], 1), |(h, _)| *h);
         hashmap_left.insert(hashes[1], (hashes[1], 1), |(h, _)| *h);
 
-        let next = vec![0, 2, 0];
+        let next = vec![2, 0];
 
         let right = build_table_i32(
             ("a", &vec![10, 20]),
diff --git a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
index 59db05164d..28af042390 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
@@ -30,34 +30,44 @@ use datafusion_physical_expr::intervals::Interval;
 use datafusion_physical_expr::utils::collect_columns;
 use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
 use hashbrown::raw::RawTable;
+use smallvec::SmallVec;
 
 use crate::physical_plan::joins::utils::{JoinFilter, JoinSide};
 use datafusion_common::Result;
 
 // Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value.
-// The indices (values) are stored in a separate chained list based on (index, next).
-// The first item in the list is reserved.
-// Note that the `u64` keys are not stored in the hashmap (hence the `()` as key), but are only used
-// to put the indices in a certain bucket.
 // By allocating a `HashMap` with capacity for *at least* the number of rows for entries at the build side,
 // we make sure that we don't have to re-hash the hashmap, which needs access to the key (the hash in this case) value.
 // E.g. 1 -> [3, 6, 8] indicates that the column values map to rows 3, 6 and 8 for hash value 1
 // As the key is a hash value, we need to check possible hash collisions in the probe stage
 // During this stage it might be the case that a row is contained the same hashmap value,
 // but the values don't match. Those are checked in the [equal_rows] macro
-// TODO: speed up collision check and move away from using a hashbrown HashMap
+// The indices (values) are stored in a separate chained list stored in the `Vec<u64>`.
+// The first index is stored in the hashmap, whereas the next value is stored in the index.
+// A value of 0 means end of list.
+// TODO: speed up collision checks
 // https://github.com/apache/arrow-datafusion/issues/50
 pub struct JoinHashMap(pub RawTable<(u64, u64)>, pub Vec<u64>);
 
+/// SymmetricJoinHashMap is similar to JoinHashMap, except that it stores the indices inline, allowing it to mutate
+/// and shrink the indices.
+pub struct SymmetricJoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>);
+
 impl JoinHashMap {
-    pub(crate) fn with_capacity(capacity: usize) -> JoinHashMap {
-        JoinHashMap(RawTable::with_capacity(capacity), vec![0; capacity + 1])
-    } 
+    pub(crate) fn with_capacity(capacity: usize) -> Self {
+        JoinHashMap(RawTable::with_capacity(capacity), vec![0; capacity])
+    }
+}
+
+impl SymmetricJoinHashMap {
+    pub(crate) fn with_capacity(capacity: usize) -> Self {
+        Self(RawTable::with_capacity(capacity))
+    }
 
     /// In this implementation, the scale_factor variable determines how conservative the shrinking strategy is.
     /// The value of scale_factor is set to 4, which means the capacity will be reduced by 25%
     /// when necessary. You can adjust the scale_factor value to achieve the desired
-    /// ,balance between memory usage and performance.
+    /// balance between memory usage and performance.
     //
     // If you increase the scale_factor, the capacity will shrink less aggressively,
     // leading to potentially higher memory usage but fewer resizes.
@@ -71,11 +81,10 @@ impl JoinHashMap {
             let new_capacity = (capacity * (scale_factor - 1)) / scale_factor;
             self.0.shrink_to(new_capacity, |(hash, _)| *hash)
         }
-        // todo handle chained list
     }
 
     pub(crate) fn size(&self) -> usize {
-        self.0.allocation_info().1.size() + self.1.capacity() * 16 + 16
+        self.0.allocation_info().1.size()
     }
 }
 
@@ -295,6 +304,7 @@ pub mod tests {
     use datafusion_common::ScalarValue;
     use datafusion_expr::Operator;
     use datafusion_physical_expr::expressions::{binary, cast, col, lit};
+    use smallvec::smallvec;
     use std::sync::Arc;
 
     /// Filter expr for a + b > c + 10 AND a + b < c + 100
@@ -632,14 +642,14 @@ pub mod tests {
     #[test]
     fn test_shrink_if_necessary() {
         let scale_factor = 4;
-        let mut join_hash_map = JoinHashMap::with_capacity(100);
+        let mut join_hash_map = SymmetricJoinHashMap::with_capacity(100);
         let data_size = 2000;
         let deleted_part = 3 * data_size / 4;
         // Add elements to the JoinHashMap
         for hash_value in 0..data_size {
             join_hash_map.0.insert(
                 hash_value,
-                (hash_value, hash_value),
+                (hash_value, smallvec![hash_value]),
                 |(hash, _)| *hash,
             );
         }
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index 80ea9e65dc..f2b750c0b6 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -41,10 +41,15 @@ use arrow::array::{
 use arrow::compute::concat_batches;
 use arrow::datatypes::{ArrowNativeType, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
+use arrow_array::builder::{UInt32BufferBuilder, UInt64BufferBuilder};
+use arrow_array::{UInt32Array, UInt64Array};
+use datafusion_physical_expr::hash_utils::create_hashes;
+use datafusion_physical_expr::PhysicalExpr;
 use futures::stream::{select, BoxStream};
 use futures::{Stream, StreamExt};
-use hashbrown::{raw::RawTable, HashSet};
+use hashbrown::HashSet;
 use parking_lot::Mutex;
+use smallvec::smallvec;
 
 use datafusion_common::{utils::bisect, ScalarValue};
 use datafusion_execution::memory_pool::MemoryConsumer;
@@ -52,12 +57,10 @@ use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval, IntervalB
 
 use crate::physical_plan::common::SharedMemoryReservation;
 use crate::physical_plan::joins::hash_join_utils::convert_sort_expr_with_filter_schema;
-use crate::physical_plan::joins::hash_join_utils::JoinHashMap;
 use crate::physical_plan::{
     expressions::Column,
     expressions::PhysicalSortExpr,
     joins::{
-        hash_join::{build_join_indices, update_hash},
         hash_join_utils::{build_filter_input_order, SortedFilterExpr},
         utils::{
             build_batch_from_indices, build_join_schema, check_join_is_valid,
@@ -73,6 +76,10 @@ use datafusion_common::JoinType;
 use datafusion_common::{DataFusionError, Result};
 use datafusion_execution::TaskContext;
 
+use super::hash_join::equal_rows;
+use super::hash_join_utils::SymmetricJoinHashMap;
+use super::utils::apply_join_filter_to_indices;
+
 const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4;
 
 /// A symmetric hash join with range conditions is when both streams are hashed on the
@@ -681,7 +688,7 @@ impl Stream for SymmetricHashJoinStream {
 
 fn prune_hash_values(
     prune_length: usize,
-    hashmap: &mut JoinHashMap,
+    hashmap: &mut SymmetricJoinHashMap,
     row_hash_values: &mut VecDeque<u64>,
     offset: u64,
 ) -> Result<()> {
@@ -702,30 +709,8 @@ fn prune_hash_values(
             .0
             .get_mut(*hash_value, |(hash, _)| hash_value == hash)
         {
-            let mut size = 0;
-            let mut i = separation_chain;
-            let mut prev_i = i;
-
-            let mut keep = false;
-
-            // TODO
-            // loop {
-            //     if !index_set.contains(i) {
-            //         if !keep {
-            //             *prev_i = i;
-            //         }
-            //         // retain this value
-            //         keep = true;
-            //         size += 1;
-            //     }
-            //     // drop value
-            //     *prev_i = i;
-
-            //     if *i == 0 {
-            //         break;
-            //     }
-            // }
-            if size == 0 {
+            separation_chain.retain(|n| !index_set.contains(n));
+            if separation_chain.is_empty() {
                 hashmap
                     .0
                     .remove_entry(*hash_value, |(hash, _)| hash_value == hash);
@@ -1065,7 +1050,7 @@ struct OneSideHashJoiner {
     /// Columns from the side
     on: Vec<Column>,
     /// Hashmap
-    hashmap: JoinHashMap,
+    hashmap: SymmetricJoinHashMap,
     /// To optimize hash deleting in case of pruning, we hold them in memory
     row_hash_values: VecDeque<u64>,
     /// Reuse the hashes buffer
@@ -1098,7 +1083,7 @@ impl OneSideHashJoiner {
             build_side,
             input_buffer: RecordBatch::new_empty(schema),
             on,
-            hashmap: JoinHashMap::with_capacity(0),
+            hashmap: SymmetricJoinHashMap::with_capacity(0),
             row_hash_values: VecDeque::new(),
             hashes_buffer: vec![],
             visited_rows: HashSet::new(),
@@ -1107,6 +1092,39 @@ impl OneSideHashJoiner {
         }
     }
 
+    pub fn update_hash(
+        on: &[Column],
+        batch: &RecordBatch,
+        hash_map: &mut SymmetricJoinHashMap,
+        offset: usize,
+        random_state: &RandomState,
+        hashes_buffer: &mut Vec<u64>,
+    ) -> Result<()> {
+        // evaluate the keys
+        let keys_values = on
+            .iter()
+            .map(|c| Ok(c.evaluate(batch)?.into_array(batch.num_rows())))
+            .collect::<Result<Vec<_>>>()?;
+        // calculate the hash values
+        let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?;
+        // insert hashes to key of the hashmap
+        for (row, hash_value) in hash_values.iter().enumerate() {
+            let item = hash_map
+                .0
+                .get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
+            if let Some((_, indices)) = item {
+                indices.push((row + offset) as u64);
+            } else {
+                hash_map.0.insert(
+                    *hash_value,
+                    (*hash_value, smallvec![(row + offset) as u64]),
+                    |(hash, _)| *hash,
+                );
+            }
+        }
+        Ok(())
+    }
+
     /// Updates the internal state of the [OneSideHashJoiner] with the incoming batch.
     ///
     /// # Arguments
@@ -1128,7 +1146,7 @@ impl OneSideHashJoiner {
         self.hashes_buffer.resize(batch.num_rows(), 0);
         // Get allocation_info before adding the item
         // Update the hashmap with the join key values and hashes of the incoming batch:
-        update_hash(
+        Self::update_hash(
             &self.on,
             batch,
             &mut self.hashmap,
@@ -1141,6 +1159,144 @@ impl OneSideHashJoiner {
         Ok(())
     }
 
+    /// Gets build and probe indices which satisfy the on condition (including
+    /// the equality condition and the join filter) in the join.
+    #[allow(clippy::too_many_arguments)]
+    pub fn build_join_indices(
+        probe_batch: &RecordBatch,
+        build_hashmap: &SymmetricJoinHashMap,
+        build_input_buffer: &RecordBatch,
+        on_build: &[Column],
+        on_probe: &[Column],
+        filter: Option<&JoinFilter>,
+        random_state: &RandomState,
+        null_equals_null: bool,
+        hashes_buffer: &mut Vec<u64>,
+        offset: Option<usize>,
+        build_side: JoinSide,
+    ) -> Result<(UInt64Array, UInt32Array)> {
+        // Get the indices that satisfy the equality condition, like `left.a1 = right.a2`
+        let (build_indices, probe_indices) = Self::build_equal_condition_join_indices(
+            build_hashmap,
+            build_input_buffer,
+            probe_batch,
+            on_build,
+            on_probe,
+            random_state,
+            null_equals_null,
+            hashes_buffer,
+            offset,
+        )?;
+        if let Some(filter) = filter {
+            // Filter the indices which satisfy the non-equal join condition, like `left.b1 = 10`
+            apply_join_filter_to_indices(
+                build_input_buffer,
+                probe_batch,
+                build_indices,
+                probe_indices,
+                filter,
+                build_side,
+            )
+        } else {
+            Ok((build_indices, probe_indices))
+        }
+    }
+
+    // Returns build/probe indices satisfying the equality condition.
+    // On LEFT.b1 = RIGHT.b2
+    // LEFT Table:
+    //  a1  b1  c1
+    //  1   1   10
+    //  3   3   30
+    //  5   5   50
+    //  7   7   70
+    //  9   8   90
+    //  11  8   110
+    // 13   10  130
+    // RIGHT Table:
+    //  a2   b2  c2
+    //  2    2   20
+    //  4    4   40
+    //  6    6   60
+    //  8    8   80
+    // 10   10  100
+    // 12   10  120
+    // The result is
+    // "+----+----+-----+----+----+-----+",
+    // "| a1 | b1 | c1  | a2 | b2 | c2  |",
+    // "+----+----+-----+----+----+-----+",
+    // "| 11 | 8  | 110 | 8  | 8  | 80  |",
+    // "| 13 | 10 | 130 | 10 | 10 | 100 |",
+    // "| 13 | 10 | 130 | 12 | 10 | 120 |",
+    // "| 9  | 8  | 90  | 8  | 8  | 80  |",
+    // "+----+----+-----+----+----+-----+"
+    // And the result of build and probe indices are:
+    // Build indices:  5, 6, 6, 4
+    // Probe indices: 3, 4, 5, 3
+    #[allow(clippy::too_many_arguments)]
+    pub fn build_equal_condition_join_indices(
+        build_hashmap: &SymmetricJoinHashMap,
+        build_input_buffer: &RecordBatch,
+        probe_batch: &RecordBatch,
+        build_on: &[Column],
+        probe_on: &[Column],
+        random_state: &RandomState,
+        null_equals_null: bool,
+        hashes_buffer: &mut Vec<u64>,
+        offset: Option<usize>,
+    ) -> Result<(UInt64Array, UInt32Array)> {
+        let keys_values = probe_on
+            .iter()
+            .map(|c| Ok(c.evaluate(probe_batch)?.into_array(probe_batch.num_rows())))
+            .collect::<Result<Vec<_>>>()?;
+        let build_join_values = build_on
+            .iter()
+            .map(|c| {
+                Ok(c.evaluate(build_input_buffer)?
+                    .into_array(build_input_buffer.num_rows()))
+            })
+            .collect::<Result<Vec<_>>>()?;
+        hashes_buffer.clear();
+        hashes_buffer.resize(probe_batch.num_rows(), 0);
+        let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?;
+        // Using a buffer builder to avoid slower normal builder
+        let mut build_indices = UInt64BufferBuilder::new(0);
+        let mut probe_indices = UInt32BufferBuilder::new(0);
+        let offset_value = offset.unwrap_or(0);
+        // Visit all of the probe rows
+        for (row, hash_value) in hash_values.iter().enumerate() {
+            // Get the hash and find it in the build index
+            // For every item on the build and probe we check if it matches
+            // This possibly contains rows with hash collisions,
+            // So we have to check here whether rows are equal or not
+            if let Some((_, indices)) = build_hashmap
+                .0
+                .get(*hash_value, |(hash, _)| *hash_value == *hash)
+            {
+                for &i in indices {
+                    // Check hash collisions
+                    let offset_build_index = i as usize - offset_value;
+                    // Check hash collisions
+                    if equal_rows(
+                        offset_build_index,
+                        row,
+                        &build_join_values,
+                        &keys_values,
+                        null_equals_null,
+                    )? {
+                        build_indices.append(offset_build_index as u64);
+                        probe_indices.append(row as u32);
+                    }
+                }
+            }
+        }
+
+        Ok((
+            PrimitiveArray::new(build_indices.finish().into(), None),
+            PrimitiveArray::new(probe_indices.finish().into(), None),
+        ))
+    }
+
     /// This method performs a join between the build side input buffer and the probe side batch.
     ///
     /// # Arguments
@@ -1177,7 +1333,7 @@ impl OneSideHashJoiner {
         if self.input_buffer.num_rows() == 0 || probe_batch.num_rows() == 0 {
             return Ok(None);
         }
-        let (build_indices, probe_indices) = build_join_indices(
+        let (build_indices, probe_indices) = Self::build_join_indices(
             probe_batch,
             &self.hashmap,
             &self.input_buffer,