You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2020/11/30 15:19:10 UTC
[arrow] branch master updated: ARROW-10693: [Rust] [DataFusion] Add
support to left join
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 17805f3 ARROW-10693: [Rust] [DataFusion] Add support to left join
17805f3 is described below
commit 17805f38a44b6184e4f694b385f8e4a1981daf06
Author: Jorge C. Leitao <jo...@gmail.com>
AuthorDate: Mon Nov 30 08:18:28 2020 -0700
ARROW-10693: [Rust] [DataFusion] Add support to left join
This PR adds support to the left join in DataFusion.
The PR is divided in two:
* the first commit extends `MutableDataArray` with an API to allow itself be extended by null values.
* the second commit uses this new API to add enable left joins in DataFusion
Closes #8743 from jorgecarleitao/clean_nulls
Authored-by: Jorge C. Leitao <jo...@gmail.com>
Signed-off-by: Andy Grove <an...@gmail.com>
---
rust/arrow/src/array/transform/boolean.rs | 5 +
rust/arrow/src/array/transform/list.rs | 13 ++
rust/arrow/src/array/transform/mod.rs | 177 +++++++++++++++++++++---
rust/arrow/src/array/transform/primitive.rs | 9 ++
rust/arrow/src/array/transform/variable_size.rs | 13 ++
rust/datafusion/src/logical_plan/builder.rs | 1 +
rust/datafusion/src/logical_plan/plan.rs | 2 +
rust/datafusion/src/physical_plan/hash_join.rs | 86 +++++++++---
rust/datafusion/src/physical_plan/hash_utils.rs | 4 +-
rust/datafusion/src/physical_plan/planner.rs | 1 +
rust/datafusion/src/test/mod.rs | 6 +
11 files changed, 279 insertions(+), 38 deletions(-)
diff --git a/rust/arrow/src/array/transform/boolean.rs b/rust/arrow/src/array/transform/boolean.rs
index 31634f4..660d4cd 100644
--- a/rust/arrow/src/array/transform/boolean.rs
+++ b/rust/arrow/src/array/transform/boolean.rs
@@ -38,3 +38,8 @@ pub(super) fn build_extend(array: &ArrayData) -> Extend {
},
)
}
+
+pub(super) fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) {
+ let buffer = &mut mutable.buffers[0];
+ reserve_for_bits(buffer, mutable.len + len);
+}
diff --git a/rust/arrow/src/array/transform/list.rs b/rust/arrow/src/array/transform/list.rs
index 8a8ccdf..43c7287 100644
--- a/rust/arrow/src/array/transform/list.rs
+++ b/rust/arrow/src/array/transform/list.rs
@@ -81,3 +81,16 @@ pub(super) fn build_extend<T: OffsetSizeTrait>(array: &ArrayData) -> Extend {
)
}
}
+
+pub(super) fn extend_nulls<T: OffsetSizeTrait>(
+ mutable: &mut _MutableArrayData,
+ len: usize,
+) {
+ let mutable_offsets = mutable.buffer::<T>(0);
+ let last_offset = mutable_offsets[mutable_offsets.len() - 1];
+
+ let offset_buffer = &mut mutable.buffers[0];
+
+ let offsets = vec![last_offset; len];
+ offset_buffer.extend_from_slice(offsets.to_byte_slice());
+}
diff --git a/rust/arrow/src/array/transform/mod.rs b/rust/arrow/src/array/transform/mod.rs
index d8e398a..6d54d7f 100644
--- a/rust/arrow/src/array/transform/mod.rs
+++ b/rust/arrow/src/array/transform/mod.rs
@@ -32,6 +32,8 @@ type ExtendNullBits<'a> = Box<Fn(&mut _MutableArrayData, usize, usize) + 'a>;
// this is dynamic because different data_types influence how buffers and childs are extended.
type Extend<'a> = Box<Fn(&mut _MutableArrayData, usize, usize, usize) + 'a>;
+type ExtendNulls = Box<Fn(&mut _MutableArrayData, usize) -> ()>;
+
/// A mutable [ArrayData] that knows how to freeze itself into an [ArrayData].
/// This is just a data container.
#[derive(Debug)]
@@ -91,7 +93,7 @@ impl<'a> _MutableArrayData<'a> {
}
}
-fn build_extend_nulls(array: &ArrayData) -> ExtendNullBits {
+fn build_extend_null_bits(array: &ArrayData, use_nulls: bool) -> ExtendNullBits {
if let Some(bitmap) = array.null_bitmap() {
let bytes = bitmap.bits.data();
Box::new(move |mutable, start, len| {
@@ -104,6 +106,15 @@ fn build_extend_nulls(array: &ArrayData) -> ExtendNullBits {
len,
);
})
+ } else if use_nulls {
+ Box::new(|mutable, _, len| {
+ utils::reserve_for_bits(&mut mutable.null_buffer, mutable.len + len);
+ let write_data = mutable.null_buffer.data_mut();
+ let offset = mutable.len;
+ (0..len).for_each(|i| {
+ bit_util::set_bit(write_data, offset + i);
+ });
+ })
} else {
Box::new(|_, _, _| {})
}
@@ -119,10 +130,10 @@ fn build_extend_nulls(array: &ArrayData) -> ExtendNullBits {
/// use arrow::{array::{Int32Array, Array, MutableArrayData}};
///
/// let array = Int32Array::from(vec![1, 2, 3, 4, 5]).data();
-/// // Create a new `MutableArrayData` from an array and with a capacity.
+/// // Create a new `MutableArrayData` from an array and with a capacity of 4.
/// // Capacity here is equivalent to `Vec::with_capacity`
/// let arrays = vec![array.as_ref()];
-/// let mut mutable = MutableArrayData::new(arrays, 4);
+/// let mut mutable = MutableArrayData::new(arrays, false, 4);
/// mutable.extend(0, 1, 3); // extend from the slice [1..3], [2,3]
/// mutable.extend(0, 0, 3); // extend from the slice [0..3], [1,2,3]
/// // `.freeze()` to convert `MutableArrayData` into a `ArrayData`.
@@ -142,12 +153,16 @@ pub struct MutableArrayData<'a> {
// at the end, when freezing [_MutableArrayData].
dictionary: Option<ArrayDataRef>,
- // the function used to extend values. This function's lifetime is bound to the array
+ // function used to extend values from arrays. This function's lifetime is bound to the array
// because it reads values from it.
extend_values: Vec<Extend<'a>>,
- // the function used to extend nulls. This function's lifetime is bound to the array
+ // function used to extend nulls from arrays. This function's lifetime is bound to the array
// because it reads nulls from it.
- extend_nulls: Vec<ExtendNullBits<'a>>,
+ extend_null_bits: Vec<ExtendNullBits<'a>>,
+
+ // function used to extend nulls.
+ // this is independent of the arrays and therefore has no lifetime.
+ extend_nulls: ExtendNulls,
}
impl<'a> std::fmt::Debug for MutableArrayData<'a> {
@@ -214,10 +229,65 @@ fn build_extend(array: &ArrayData) -> Extend {
}
}
+fn build_extend_nulls(data_type: &DataType) -> ExtendNulls {
+ use crate::datatypes::*;
+ Box::new(match data_type {
+ DataType::Boolean => boolean::extend_nulls,
+ DataType::UInt8 => primitive::extend_nulls::<u8>,
+ DataType::UInt16 => primitive::extend_nulls::<u16>,
+ DataType::UInt32 => primitive::extend_nulls::<u32>,
+ DataType::UInt64 => primitive::extend_nulls::<u64>,
+ DataType::Int8 => primitive::extend_nulls::<i8>,
+ DataType::Int16 => primitive::extend_nulls::<i16>,
+ DataType::Int32 => primitive::extend_nulls::<i32>,
+ DataType::Int64 => primitive::extend_nulls::<i64>,
+ DataType::Float32 => primitive::extend_nulls::<f32>,
+ DataType::Float64 => primitive::extend_nulls::<f64>,
+ DataType::Date32(_)
+ | DataType::Time32(_)
+ | DataType::Interval(IntervalUnit::YearMonth) => primitive::extend_nulls::<i32>,
+ DataType::Date64(_)
+ | DataType::Time64(_)
+ | DataType::Timestamp(_, _)
+ | DataType::Duration(_)
+ | DataType::Interval(IntervalUnit::DayTime) => primitive::extend_nulls::<i64>,
+ DataType::Utf8 | DataType::Binary => variable_size::extend_nulls::<i32>,
+ DataType::LargeUtf8 | DataType::LargeBinary => variable_size::extend_nulls::<i64>,
+ DataType::List(_) => list::extend_nulls::<i32>,
+ DataType::LargeList(_) => list::extend_nulls::<i64>,
+ DataType::Dictionary(child_data_type, _) => match child_data_type.as_ref() {
+ DataType::UInt8 => primitive::extend_nulls::<u8>,
+ DataType::UInt16 => primitive::extend_nulls::<u16>,
+ DataType::UInt32 => primitive::extend_nulls::<u32>,
+ DataType::UInt64 => primitive::extend_nulls::<u64>,
+ DataType::Int8 => primitive::extend_nulls::<i8>,
+ DataType::Int16 => primitive::extend_nulls::<i16>,
+ DataType::Int32 => primitive::extend_nulls::<i32>,
+ DataType::Int64 => primitive::extend_nulls::<i64>,
+ _ => unreachable!(),
+ },
+ //DataType::Struct(_) => structure::build_extend(array),
+ DataType::Float16 => unreachable!(),
+ /*
+ DataType::Null => {}
+ DataType::FixedSizeBinary(_) => {}
+ DataType::FixedSizeList(_, _) => {}
+ DataType::Union(_) => {}
+ */
+ _ => {
+ todo!("Take and filter operations still not supported for this datatype")
+ }
+ })
+}
+
impl<'a> MutableArrayData<'a> {
/// returns a new [MutableArrayData] with capacity to `capacity` slots and specialized to create an
- /// [ArrayData] from `array`
- pub fn new(arrays: Vec<&'a ArrayData>, capacity: usize) -> Self {
+ /// [ArrayData] from multiple `arrays`.
+ ///
+ /// `use_nulls` is a flag used to optimize insertions. It should be `false` if the only source of nulls
+ /// are the arrays themselves and `true` if the user plans to call [MutableArrayData::extend_nulls].
+ /// In other words, if `use_nulls` is `false`, calling [MutableArrayData::extend_nulls] should not be used.
+ pub fn new(arrays: Vec<&'a ArrayData>, use_nulls: bool, capacity: usize) -> Self {
let data_type = arrays[0].data_type();
use crate::datatypes::*;
@@ -321,7 +391,7 @@ impl<'a> MutableArrayData<'a> {
.iter()
.map(|array| array.child_data()[0].as_ref())
.collect::<Vec<_>>();
- vec![MutableArrayData::new(childs, capacity)]
+ vec![MutableArrayData::new(childs, use_nulls, capacity)]
}
// the dictionary type just appends keys and clones the values.
DataType::Dictionary(_, _) => vec![],
@@ -336,9 +406,11 @@ impl<'a> MutableArrayData<'a> {
_ => None,
};
- let extend_nulls = arrays
+ let extend_nulls = build_extend_nulls(data_type);
+
+ let extend_null_bits = arrays
.iter()
- .map(|array| build_extend_nulls(array))
+ .map(|array| build_extend_null_bits(array, use_nulls))
.collect();
let null_bytes = bit_util::ceil(capacity, 8);
@@ -355,10 +427,11 @@ impl<'a> MutableArrayData<'a> {
child_data,
};
Self {
- arrays: arrays.to_vec(),
+ arrays,
data,
dictionary,
extend_values,
+ extend_null_bits,
extend_nulls,
}
}
@@ -369,11 +442,18 @@ impl<'a> MutableArrayData<'a> {
/// This function panics if the range is out of bounds, i.e. if `start + len >= array.len()`.
pub fn extend(&mut self, index: usize, start: usize, end: usize) {
let len = end - start;
- (self.extend_nulls[index])(&mut self.data, start, len);
+ (self.extend_null_bits[index])(&mut self.data, start, len);
(self.extend_values[index])(&mut self.data, index, start, len);
self.data.len += len;
}
+ /// Extends this [MutableArrayData] with null elements, disregarding the bound arrays
+ pub fn extend_nulls(&mut self, len: usize) {
+ self.data.null_count += len;
+ (self.extend_nulls)(&mut self.data, len);
+ self.data.len += len;
+ }
+
/// Creates a [ArrayData] from the pushed regions up to this point, consuming `self`.
pub fn freeze(self) -> ArrayData {
self.data.freeze(self.dictionary)
@@ -396,7 +476,7 @@ mod tests {
fn test_primitive() {
let b = UInt8Array::from(vec![Some(1), Some(2), Some(3)]).data();
let arrays = vec![b.as_ref()];
- let mut a = MutableArrayData::new(arrays, 3);
+ let mut a = MutableArrayData::new(arrays, false, 3);
a.extend(0, 0, 2);
let result = a.freeze();
let array = UInt8Array::from(Arc::new(result));
@@ -410,7 +490,7 @@ mod tests {
let b = UInt8Array::from(vec![Some(1), Some(2), Some(3)]);
let b = b.slice(1, 2).data();
let arrays = vec![b.as_ref()];
- let mut a = MutableArrayData::new(arrays, 2);
+ let mut a = MutableArrayData::new(arrays, false, 2);
a.extend(0, 0, 2);
let result = a.freeze();
let array = UInt8Array::from(Arc::new(result));
@@ -424,7 +504,7 @@ mod tests {
let b = UInt8Array::from(vec![Some(1), None, Some(3)]);
let b = b.slice(1, 2).data();
let arrays = vec![b.as_ref()];
- let mut a = MutableArrayData::new(arrays, 2);
+ let mut a = MutableArrayData::new(arrays, false, 2);
a.extend(0, 0, 2);
let result = a.freeze();
let array = UInt8Array::from(Arc::new(result));
@@ -433,6 +513,22 @@ mod tests {
}
#[test]
+ fn test_primitive_null_offset_nulls() {
+ let b = UInt8Array::from(vec![Some(1), Some(2), Some(3)]);
+ let b = b.slice(1, 2).data();
+ let arrays = vec![b.as_ref()];
+ let mut a = MutableArrayData::new(arrays, true, 2);
+ a.extend(0, 0, 2);
+ a.extend_nulls(3);
+ a.extend(0, 1, 2);
+ let result = a.freeze();
+ let array = UInt8Array::from(Arc::new(result));
+ let expected =
+ UInt8Array::from(vec![Some(2), Some(3), None, None, None, Some(3)]);
+ assert_eq!(array, expected);
+ }
+
+ #[test]
fn test_list_null_offset() -> Result<()> {
let int_builder = Int64Builder::new(24);
let mut builder = ListBuilder::<Int64Builder>::new(int_builder);
@@ -445,7 +541,7 @@ mod tests {
let array = builder.finish().data();
let arrays = vec![array.as_ref()];
- let mut mutable = MutableArrayData::new(arrays, 0);
+ let mut mutable = MutableArrayData::new(arrays, false, 0);
mutable.extend(0, 0, 1);
let result = mutable.freeze();
@@ -469,7 +565,7 @@ mod tests {
StringArray::from(vec![Some("a"), Some("bc"), None, Some("defh")]).data();
let arrays = vec![array.as_ref()];
- let mut mutable = MutableArrayData::new(arrays, 0);
+ let mut mutable = MutableArrayData::new(arrays, false, 0);
mutable.extend(0, 1, 3);
@@ -490,7 +586,7 @@ mod tests {
let arrays = vec![&array];
- let mut mutable = MutableArrayData::new(arrays, 0);
+ let mut mutable = MutableArrayData::new(arrays, false, 0);
mutable.extend(0, 0, 3);
@@ -502,12 +598,51 @@ mod tests {
}
#[test]
+ fn test_string_offsets() {
+ let array =
+ StringArray::from(vec![Some("a"), Some("bc"), None, Some("defh")]).data();
+ let array = array.slice(1, 3);
+
+ let arrays = vec![&array];
+
+ let mut mutable = MutableArrayData::new(arrays, false, 0);
+
+ mutable.extend(0, 0, 3);
+
+ let result = mutable.freeze();
+ let result = StringArray::from(Arc::new(result));
+
+ let expected = StringArray::from(vec![Some("bc"), None, Some("defh")]);
+ assert_eq!(result, expected);
+ }
+
+ #[test]
+ fn test_string_null_offset_nulls() {
+ let array =
+ StringArray::from(vec![Some("a"), Some("bc"), None, Some("defh")]).data();
+ let array = array.slice(1, 3);
+
+ let arrays = vec![&array];
+
+ let mut mutable = MutableArrayData::new(arrays, true, 0);
+
+ mutable.extend(0, 1, 3);
+ mutable.extend_nulls(1);
+
+ let result = mutable.freeze();
+ let result = StringArray::from(Arc::new(result));
+
+ let expected = StringArray::from(vec![None, Some("defh"), None]);
+ assert_eq!(result, expected);
+ }
+
+ #[test]
fn test_bool() {
let array =
BooleanArray::from(vec![Some(false), Some(true), None, Some(false)]).data();
let arrays = vec![array.as_ref()];
- let mut mutable = MutableArrayData::new(arrays, 0);
+ let mut mutable = MutableArrayData::new(arrays, false, 0);
mutable.extend(0, 1, 3);
@@ -544,7 +679,7 @@ mod tests {
);
let arrays = vec![array.as_ref()];
- let mut mutable = MutableArrayData::new(arrays, 0);
+ let mut mutable = MutableArrayData::new(arrays, false, 0);
mutable.extend(0, 1, 3);
diff --git a/rust/arrow/src/array/transform/primitive.rs b/rust/arrow/src/array/transform/primitive.rs
index 3560802..a00ae4e 100644
--- a/rust/arrow/src/array/transform/primitive.rs
+++ b/rust/arrow/src/array/transform/primitive.rs
@@ -33,3 +33,12 @@ pub(super) fn build_extend<T: ArrowNativeType>(array: &ArrayData) -> Extend {
},
)
}
+
+pub(super) fn extend_nulls<T: ArrowNativeType>(
+ mutable: &mut _MutableArrayData,
+ len: usize,
+) {
+ let buffer = &mut mutable.buffers[0];
+ let bytes = vec![0u8; len * size_of::<T>()];
+ buffer.extend_from_slice(&bytes);
+}
diff --git a/rust/arrow/src/array/transform/variable_size.rs b/rust/arrow/src/array/transform/variable_size.rs
index 48e77d5..6735c84 100644
--- a/rust/arrow/src/array/transform/variable_size.rs
+++ b/rust/arrow/src/array/transform/variable_size.rs
@@ -91,3 +91,16 @@ pub(super) fn build_extend<T: OffsetSizeTrait>(array: &ArrayData) -> Extend {
)
}
}
+
+pub(super) fn extend_nulls<T: OffsetSizeTrait>(
+ mutable: &mut _MutableArrayData,
+ len: usize,
+) {
+ let mutable_offsets = mutable.buffer::<T>(0);
+ let last_offset = mutable_offsets[mutable_offsets.len() - 1];
+
+ let offset_buffer = &mut mutable.buffers[0];
+
+ let offsets = vec![last_offset; len];
+ offset_buffer.extend_from_slice(offsets.to_byte_slice());
+}
diff --git a/rust/datafusion/src/logical_plan/builder.rs b/rust/datafusion/src/logical_plan/builder.rs
index 08aa4ac..b36a189 100644
--- a/rust/datafusion/src/logical_plan/builder.rs
+++ b/rust/datafusion/src/logical_plan/builder.rs
@@ -203,6 +203,7 @@ impl LogicalPlanBuilder {
.collect::<Vec<_>>();
let physical_join_type = match join_type {
JoinType::Inner => hash_utils::JoinType::Inner,
+ JoinType::Left => hash_utils::JoinType::Left,
};
let physical_schema = hash_utils::build_join_schema(
self.plan.schema(),
diff --git a/rust/datafusion/src/logical_plan/plan.rs b/rust/datafusion/src/logical_plan/plan.rs
index 74f2082..9378fc0 100644
--- a/rust/datafusion/src/logical_plan/plan.rs
+++ b/rust/datafusion/src/logical_plan/plan.rs
@@ -46,6 +46,8 @@ pub enum TableSource {
pub enum JoinType {
/// Inner join
Inner,
+ /// Left join
+ Left,
}
/// A LogicalPlan represents the different types of relational
diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs
index 8f86df2..b36b9ee 100644
--- a/rust/datafusion/src/physical_plan/hash_join.rs
+++ b/rust/datafusion/src/physical_plan/hash_join.rs
@@ -285,15 +285,14 @@ fn build_batch_from_indices(
.map(|array| array.as_ref())
.collect::<Vec<_>>();
let capacity = arrays.iter().map(|array| array.len()).sum();
- let mut mutable = MutableArrayData::new(arrays, capacity);
+ let mut mutable = MutableArrayData::new(arrays, true, capacity);
let array = if is_left {
// build the array using the left
for (join_index, _) in indices {
match join_index {
Some((batch, row)) => mutable.extend(*batch, *row, *row + 1),
- // something like `mutable.extend_nulls(*row, *row + 1)`
- None => unimplemented!(),
+ None => mutable.extend_nulls(1),
}
}
make_array(Arc::new(mutable.freeze()))
@@ -302,8 +301,7 @@ fn build_batch_from_indices(
for (_, join_index) in indices {
match join_index {
Some((batch, row)) => mutable.extend(*batch, *row, *row + 1),
- // something like `mutable.extend_nulls(*row, *row + 1)`
- None => unimplemented!(),
+ None => mutable.extend_nulls(1),
}
}
make_array(Arc::new(mutable.freeze()))
@@ -384,6 +382,31 @@ fn build_join_indexes(
}
Ok(indexes)
}
+ JoinType::Left => {
+ // left => left keys
+ let keys = left.keys();
+
+ let mut indexes = Vec::new(); // unknown a prior size
+ for key in keys {
+ let left_indexes = left.get(key).unwrap();
+
+ // for every item on the left and right with this key, add the respective pair
+ if let Some(right_indexes) = right.get(key) {
+ left_indexes.iter().for_each(|x| {
+ right_indexes.iter().for_each(|y| {
+ // on an inner join, left and right indices are present
+ indexes.push((Some(*x), Some(*y)));
+ })
+ })
+ } else {
+ // key not on the right => push Nones
+ left_indexes.iter().for_each(|x| {
+ indexes.push((Some(*x), None));
+ })
+ }
+ }
+ Ok(indexes)
+ }
}
}
@@ -435,12 +458,13 @@ mod tests {
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: &[(&str, &str)],
+ join_type: &JoinType,
) -> Result<HashJoinExec> {
let on: Vec<_> = on
.iter()
.map(|(l, r)| (l.to_string(), r.to_string()))
.collect();
- HashJoinExec::try_new(left, right, &on, &JoinType::Inner)
+ HashJoinExec::try_new(left, right, &on, join_type)
}
/// Asserts that the rows are the same, taking into account that their order
@@ -457,7 +481,7 @@ mod tests {
}
#[tokio::test]
- async fn join_one() -> Result<()> {
+ async fn join_inner_one() -> Result<()> {
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 5]), // this has a repetition
@@ -470,7 +494,7 @@ mod tests {
);
let on = &[("b1", "b1")];
- let join = join(left, right, on)?;
+ let join = join(left, right, on, &JoinType::Inner)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "c2"]);
@@ -487,7 +511,7 @@ mod tests {
}
#[tokio::test]
- async fn join_one_no_shared_column_names() -> Result<()> {
+ async fn join_inner_one_no_shared_column_names() -> Result<()> {
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 5]), // this has a repetition
@@ -500,7 +524,7 @@ mod tests {
);
let on = &[("b1", "b2")];
- let join = join(left, right, on)?;
+ let join = join(left, right, on, &JoinType::Inner)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
@@ -517,7 +541,7 @@ mod tests {
}
#[tokio::test]
- async fn join_two() -> Result<()> {
+ async fn join_inner_two() -> Result<()> {
let left = build_table(
("a1", &vec![1, 2, 2]),
("b2", &vec![1, 2, 2]),
@@ -530,7 +554,7 @@ mod tests {
);
let on = &[("a1", "a1"), ("b2", "b2")];
- let join = join(left, right, on)?;
+ let join = join(left, right, on, &JoinType::Inner)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b2", "c1", "c2"]);
@@ -549,7 +573,7 @@ mod tests {
/// Test where the left has 2 parts, the right with 1 part => 1 part
#[tokio::test]
- async fn join_one_two_parts_left() -> Result<()> {
+ async fn join_inner_one_two_parts_left() -> Result<()> {
let batch1 = build_table_i32(
("a1", &vec![1, 2]),
("b2", &vec![1, 2]),
@@ -569,7 +593,7 @@ mod tests {
);
let on = &[("a1", "a1"), ("b2", "b2")];
- let join = join(left, right, on)?;
+ let join = join(left, right, on, &JoinType::Inner)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b2", "c1", "c2"]);
@@ -588,7 +612,7 @@ mod tests {
/// Test where the left has 1 part, the right has 2 parts => 2 parts
#[tokio::test]
- async fn join_one_two_parts_right() -> Result<()> {
+ async fn join_inner_one_two_parts_right() -> Result<()> {
let left = build_table(
("a1", &vec![1, 2, 3]),
("b1", &vec![4, 5, 5]), // this has a repetition
@@ -609,7 +633,7 @@ mod tests {
let on = &[("b1", "b1")];
- let join = join(left, right, on)?;
+ let join = join(left, right, on, &JoinType::Inner)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "c2"]);
@@ -634,4 +658,34 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn join_left_one() -> Result<()> {
+ let left = build_table(
+ ("a1", &vec![1, 2, 3]),
+ ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
+ ("c1", &vec![7, 8, 9]),
+ );
+ let right = build_table(
+ ("a2", &vec![10, 20, 30]),
+ ("b1", &vec![4, 5, 6]),
+ ("c2", &vec![70, 80, 90]),
+ );
+ let on = &[("b1", "b1")];
+
+ let join = join(left, right, on, &JoinType::Left)?;
+
+ let columns = columns(&join.schema());
+ assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "c2"]);
+
+ let stream = join.execute(0).await?;
+ let batches = common::collect(stream).await?;
+
+ let result = format_batch(&batches[0]);
+ let expected = vec!["1,4,7,10,70", "2,5,8,20,80", "3,7,9,NULL,NULL"];
+
+ assert_same_rows(&result, &expected);
+
+ Ok(())
+ }
}
diff --git a/rust/datafusion/src/physical_plan/hash_utils.rs b/rust/datafusion/src/physical_plan/hash_utils.rs
index 1492c03..f41b811 100644
--- a/rust/datafusion/src/physical_plan/hash_utils.rs
+++ b/rust/datafusion/src/physical_plan/hash_utils.rs
@@ -26,6 +26,8 @@ use std::collections::HashSet;
pub enum JoinType {
/// Inner join
Inner,
+ /// Left
+ Left,
}
/// The on clause of the join, as vector of (left, right) columns.
@@ -93,7 +95,7 @@ pub fn build_join_schema(
join_type: &JoinType,
) -> Schema {
let fields: Vec<Field> = match join_type {
- JoinType::Inner => {
+ JoinType::Inner | JoinType::Left => {
// remove right-side join keys if they have the same names as the left-side
let duplicate_keys = &on
.iter()
diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs
index b592111..3b2488b 100644
--- a/rust/datafusion/src/physical_plan/planner.rs
+++ b/rust/datafusion/src/physical_plan/planner.rs
@@ -301,6 +301,7 @@ impl DefaultPhysicalPlanner {
let right = self.create_physical_plan(right, ctx_state)?;
let physical_join_type = match join_type {
JoinType::Inner => hash_utils::JoinType::Inner,
+ JoinType::Left => hash_utils::JoinType::Left,
};
Ok(Arc::new(HashJoinExec::try_new(
left,
diff --git a/rust/datafusion/src/test/mod.rs b/rust/datafusion/src/test/mod.rs
index 99f6c7d..eccbbf8 100644
--- a/rust/datafusion/src/test/mod.rs
+++ b/rust/datafusion/src/test/mod.rs
@@ -133,6 +133,12 @@ pub fn format_batch(batch: &RecordBatch) -> Vec<String> {
s.push(',');
}
let array = batch.column(column_index);
+
+ if array.is_null(row_index) {
+ s.push_str("NULL");
+ continue;
+ }
+
match array.data_type() {
DataType::Utf8 => s.push_str(
array