You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/12/21 14:16:50 UTC

[arrow-datafusion] branch master updated: Left join could use bitmap for left join instead of Vec (#1291)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ef42eb  Left join could use bitmap for left join instead of Vec<bool> (#1291)
5ef42eb is described below

commit 5ef42ebdd75bc703097ce90e3e339861e14c91b6
Author: Boaz <be...@gmail.com>
AuthorDate: Tue Dec 21 16:16:45 2021 +0200

    Left join could use bitmap for left join instead of Vec<bool> (#1291)
    
    * Left join could use bitmap for left join instead of Vec<bool>
    
    * fix
    
    * Fix
    
    * Finish implementation
    
    * Update hash_join.rs
---
 datafusion/src/physical_plan/hash_join.rs | 34 ++++++++++++++-----------------
 1 file changed, 15 insertions(+), 19 deletions(-)

diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs
index 727d1c6..8cb2f44 100644
--- a/datafusion/src/physical_plan/hash_join.rs
+++ b/datafusion/src/physical_plan/hash_join.rs
@@ -68,6 +68,7 @@ use super::{
     DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
     SendableRecordBatchStream,
 };
+use crate::arrow::array::BooleanBufferBuilder;
 use crate::arrow::datatypes::TimeUnit;
 use crate::physical_plan::coalesce_batches::concat_batches;
 use crate::physical_plan::PhysicalExpr;
@@ -401,9 +402,13 @@ impl ExecutionPlan for HashJoinExec {
         let num_rows = left_data.1.num_rows();
         let visited_left_side = match self.join_type {
             JoinType::Left | JoinType::Full | JoinType::Semi | JoinType::Anti => {
-                vec![false; num_rows]
+                let mut buffer = BooleanBufferBuilder::new(num_rows);
+
+                buffer.append_n(num_rows, false);
+
+                buffer
             }
-            JoinType::Inner | JoinType::Right => vec![],
+            JoinType::Inner | JoinType::Right => BooleanBufferBuilder::new(0),
         };
         Ok(Box::pin(HashJoinStream::new(
             self.schema.clone(),
@@ -502,8 +507,7 @@ struct HashJoinStream {
     /// Random state used for hashing initialization
     random_state: RandomState,
     /// Keeps track of the left side rows whether they are visited
-    visited_left_side: Vec<bool>,
-    // TODO: use a more memory efficient data structure, https://github.com/apache/arrow-datafusion/issues/240
+    visited_left_side: BooleanBufferBuilder,
     /// There is nothing to process anymore and left side is processed in case of left join
     is_exhausted: bool,
     /// Metrics
@@ -525,7 +529,7 @@ impl HashJoinStream {
         right: SendableRecordBatchStream,
         column_indices: Vec<ColumnIndex>,
         random_state: RandomState,
-        visited_left_side: Vec<bool>,
+        visited_left_side: BooleanBufferBuilder,
         join_metrics: HashJoinMetrics,
         null_equals_null: bool,
     ) -> Self {
@@ -909,29 +913,21 @@ fn equal_rows(
 
 // Produces a batch for left-side rows that have/have not been matched during the whole join
 fn produce_from_matched(
-    visited_left_side: &[bool],
+    visited_left_side: &BooleanBufferBuilder,
     schema: &SchemaRef,
     column_indices: &[ColumnIndex],
     left_data: &JoinLeftData,
     unmatched: bool,
 ) -> ArrowResult<RecordBatch> {
-    // Find indices which didn't match any right row (are false)
     let indices = if unmatched {
         UInt64Array::from_iter_values(
-            visited_left_side
-                .iter()
-                .enumerate()
-                .filter(|&(_, &value)| !value)
-                .map(|(index, _)| index as u64),
+            (0..visited_left_side.len())
+                .filter_map(|v| (!visited_left_side.get_bit(v)).then(|| v as u64)),
         )
     } else {
-        // produce those that did match
         UInt64Array::from_iter_values(
-            visited_left_side
-                .iter()
-                .enumerate()
-                .filter(|&(_, &value)| value)
-                .map(|(index, _)| index as u64),
+            (0..visited_left_side.len())
+                .filter_map(|v| (visited_left_side.get_bit(v)).then(|| v as u64)),
         )
     };
 
@@ -991,7 +987,7 @@ impl Stream for HashJoinStream {
                             | JoinType::Semi
                             | JoinType::Anti => {
                                 left_side.iter().flatten().for_each(|x| {
-                                    self.visited_left_side[x as usize] = true;
+                                    self.visited_left_side.set_bit(x as usize, true);
                                 });
                             }
                             JoinType::Inner | JoinType::Right => {}