You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/12/21 14:16:50 UTC
[arrow-datafusion] branch master updated: Left join could use bitmap for left join instead of Vec (#1291)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 5ef42eb Left join could use bitmap for left join instead of Vec<bool> (#1291)
5ef42eb is described below
commit 5ef42ebdd75bc703097ce90e3e339861e14c91b6
Author: Boaz <be...@gmail.com>
AuthorDate: Tue Dec 21 16:16:45 2021 +0200
Left join could use bitmap for left join instead of Vec<bool> (#1291)
* Left join could use bitmap for left join instead of Vec<bool>
* fix
* Fix
* Finish implementation
* Update hash_join.rs
---
datafusion/src/physical_plan/hash_join.rs | 34 ++++++++++++++-----------------
1 file changed, 15 insertions(+), 19 deletions(-)
diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs
index 727d1c6..8cb2f44 100644
--- a/datafusion/src/physical_plan/hash_join.rs
+++ b/datafusion/src/physical_plan/hash_join.rs
@@ -68,6 +68,7 @@ use super::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream,
};
+use crate::arrow::array::BooleanBufferBuilder;
use crate::arrow::datatypes::TimeUnit;
use crate::physical_plan::coalesce_batches::concat_batches;
use crate::physical_plan::PhysicalExpr;
@@ -401,9 +402,13 @@ impl ExecutionPlan for HashJoinExec {
let num_rows = left_data.1.num_rows();
let visited_left_side = match self.join_type {
JoinType::Left | JoinType::Full | JoinType::Semi | JoinType::Anti => {
- vec![false; num_rows]
+ let mut buffer = BooleanBufferBuilder::new(num_rows);
+
+ buffer.append_n(num_rows, false);
+
+ buffer
}
- JoinType::Inner | JoinType::Right => vec![],
+ JoinType::Inner | JoinType::Right => BooleanBufferBuilder::new(0),
};
Ok(Box::pin(HashJoinStream::new(
self.schema.clone(),
@@ -502,8 +507,7 @@ struct HashJoinStream {
/// Random state used for hashing initialization
random_state: RandomState,
/// Keeps track of the left side rows whether they are visited
- visited_left_side: Vec<bool>,
- // TODO: use a more memory efficient data structure, https://github.com/apache/arrow-datafusion/issues/240
+ visited_left_side: BooleanBufferBuilder,
/// There is nothing to process anymore and left side is processed in case of left join
is_exhausted: bool,
/// Metrics
@@ -525,7 +529,7 @@ impl HashJoinStream {
right: SendableRecordBatchStream,
column_indices: Vec<ColumnIndex>,
random_state: RandomState,
- visited_left_side: Vec<bool>,
+ visited_left_side: BooleanBufferBuilder,
join_metrics: HashJoinMetrics,
null_equals_null: bool,
) -> Self {
@@ -909,29 +913,21 @@ fn equal_rows(
// Produces a batch for left-side rows that have/have not been matched during the whole join
fn produce_from_matched(
- visited_left_side: &[bool],
+ visited_left_side: &BooleanBufferBuilder,
schema: &SchemaRef,
column_indices: &[ColumnIndex],
left_data: &JoinLeftData,
unmatched: bool,
) -> ArrowResult<RecordBatch> {
- // Find indices which didn't match any right row (are false)
let indices = if unmatched {
UInt64Array::from_iter_values(
- visited_left_side
- .iter()
- .enumerate()
- .filter(|&(_, &value)| !value)
- .map(|(index, _)| index as u64),
+ (0..visited_left_side.len())
+ .filter_map(|v| (!visited_left_side.get_bit(v)).then(|| v as u64)),
)
} else {
- // produce those that did match
UInt64Array::from_iter_values(
- visited_left_side
- .iter()
- .enumerate()
- .filter(|&(_, &value)| value)
- .map(|(index, _)| index as u64),
+ (0..visited_left_side.len())
+ .filter_map(|v| (visited_left_side.get_bit(v)).then(|| v as u64)),
)
};
@@ -991,7 +987,7 @@ impl Stream for HashJoinStream {
| JoinType::Semi
| JoinType::Anti => {
left_side.iter().flatten().for_each(|x| {
- self.visited_left_side[x as usize] = true;
+ self.visited_left_side.set_bit(x as usize, true);
});
}
JoinType::Inner | JoinType::Right => {}