You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2020/11/30 15:19:10 UTC

[arrow] branch master updated: ARROW-10693: [Rust] [DataFusion] Add support to left join

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 17805f3  ARROW-10693: [Rust] [DataFusion] Add support to left join
17805f3 is described below

commit 17805f38a44b6184e4f694b385f8e4a1981daf06
Author: Jorge C. Leitao <jo...@gmail.com>
AuthorDate: Mon Nov 30 08:18:28 2020 -0700

    ARROW-10693: [Rust] [DataFusion] Add support to left join
    
    This PR adds support to the left join in DataFusion.
    
    The PR is divided in two:
    * the first commit extends `MutableDataArray` with an API to allow itself be extended by null values.
    * the second commit uses this new API to add enable left joins in DataFusion
    
    Closes #8743 from jorgecarleitao/clean_nulls
    
    Authored-by: Jorge C. Leitao <jo...@gmail.com>
    Signed-off-by: Andy Grove <an...@gmail.com>
---
 rust/arrow/src/array/transform/boolean.rs       |   5 +
 rust/arrow/src/array/transform/list.rs          |  13 ++
 rust/arrow/src/array/transform/mod.rs           | 177 +++++++++++++++++++++---
 rust/arrow/src/array/transform/primitive.rs     |   9 ++
 rust/arrow/src/array/transform/variable_size.rs |  13 ++
 rust/datafusion/src/logical_plan/builder.rs     |   1 +
 rust/datafusion/src/logical_plan/plan.rs        |   2 +
 rust/datafusion/src/physical_plan/hash_join.rs  |  86 +++++++++---
 rust/datafusion/src/physical_plan/hash_utils.rs |   4 +-
 rust/datafusion/src/physical_plan/planner.rs    |   1 +
 rust/datafusion/src/test/mod.rs                 |   6 +
 11 files changed, 279 insertions(+), 38 deletions(-)

diff --git a/rust/arrow/src/array/transform/boolean.rs b/rust/arrow/src/array/transform/boolean.rs
index 31634f4..660d4cd 100644
--- a/rust/arrow/src/array/transform/boolean.rs
+++ b/rust/arrow/src/array/transform/boolean.rs
@@ -38,3 +38,8 @@ pub(super) fn build_extend(array: &ArrayData) -> Extend {
         },
     )
 }
+
+pub(super) fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) {
+    let buffer = &mut mutable.buffers[0];
+    reserve_for_bits(buffer, mutable.len + len);
+}
diff --git a/rust/arrow/src/array/transform/list.rs b/rust/arrow/src/array/transform/list.rs
index 8a8ccdf..43c7287 100644
--- a/rust/arrow/src/array/transform/list.rs
+++ b/rust/arrow/src/array/transform/list.rs
@@ -81,3 +81,16 @@ pub(super) fn build_extend<T: OffsetSizeTrait>(array: &ArrayData) -> Extend {
         )
     }
 }
+
+pub(super) fn extend_nulls<T: OffsetSizeTrait>(
+    mutable: &mut _MutableArrayData,
+    len: usize,
+) {
+    let mutable_offsets = mutable.buffer::<T>(0);
+    let last_offset = mutable_offsets[mutable_offsets.len() - 1];
+
+    let offset_buffer = &mut mutable.buffers[0];
+
+    let offsets = vec![last_offset; len];
+    offset_buffer.extend_from_slice(offsets.to_byte_slice());
+}
diff --git a/rust/arrow/src/array/transform/mod.rs b/rust/arrow/src/array/transform/mod.rs
index d8e398a..6d54d7f 100644
--- a/rust/arrow/src/array/transform/mod.rs
+++ b/rust/arrow/src/array/transform/mod.rs
@@ -32,6 +32,8 @@ type ExtendNullBits<'a> = Box<Fn(&mut _MutableArrayData, usize, usize) + 'a>;
 // this is dynamic because different data_types influence how buffers and childs are extended.
 type Extend<'a> = Box<Fn(&mut _MutableArrayData, usize, usize, usize) + 'a>;
 
+type ExtendNulls = Box<Fn(&mut _MutableArrayData, usize) -> ()>;
+
 /// A mutable [ArrayData] that knows how to freeze itself into an [ArrayData].
 /// This is just a data container.
 #[derive(Debug)]
@@ -91,7 +93,7 @@ impl<'a> _MutableArrayData<'a> {
     }
 }
 
-fn build_extend_nulls(array: &ArrayData) -> ExtendNullBits {
+fn build_extend_null_bits(array: &ArrayData, use_nulls: bool) -> ExtendNullBits {
     if let Some(bitmap) = array.null_bitmap() {
         let bytes = bitmap.bits.data();
         Box::new(move |mutable, start, len| {
@@ -104,6 +106,15 @@ fn build_extend_nulls(array: &ArrayData) -> ExtendNullBits {
                 len,
             );
         })
+    } else if use_nulls {
+        Box::new(|mutable, _, len| {
+            utils::reserve_for_bits(&mut mutable.null_buffer, mutable.len + len);
+            let write_data = mutable.null_buffer.data_mut();
+            let offset = mutable.len;
+            (0..len).for_each(|i| {
+                bit_util::set_bit(write_data, offset + i);
+            });
+        })
     } else {
         Box::new(|_, _, _| {})
     }
@@ -119,10 +130,10 @@ fn build_extend_nulls(array: &ArrayData) -> ExtendNullBits {
 /// use arrow::{array::{Int32Array, Array, MutableArrayData}};
 ///
 /// let array = Int32Array::from(vec![1, 2, 3, 4, 5]).data();
-/// // Create a new `MutableArrayData` from an array and with a capacity.
+/// // Create a new `MutableArrayData` from an array and with a capacity of 4.
 /// // Capacity here is equivalent to `Vec::with_capacity`
 /// let arrays = vec![array.as_ref()];
-/// let mut mutable = MutableArrayData::new(arrays, 4);
+/// let mut mutable = MutableArrayData::new(arrays, false, 4);
 /// mutable.extend(0, 1, 3); // extend from the slice [1..3], [2,3]
 /// mutable.extend(0, 0, 3); // extend from the slice [0..3], [1,2,3]
 /// // `.freeze()` to convert `MutableArrayData` into a `ArrayData`.
@@ -142,12 +153,16 @@ pub struct MutableArrayData<'a> {
     // at the end, when freezing [_MutableArrayData].
     dictionary: Option<ArrayDataRef>,
 
-    // the function used to extend values. This function's lifetime is bound to the array
+    // function used to extend values from arrays. This function's lifetime is bound to the array
     // because it reads values from it.
     extend_values: Vec<Extend<'a>>,
-    // the function used to extend nulls. This function's lifetime is bound to the array
+    // function used to extend nulls from arrays. This function's lifetime is bound to the array
     // because it reads nulls from it.
-    extend_nulls: Vec<ExtendNullBits<'a>>,
+    extend_null_bits: Vec<ExtendNullBits<'a>>,
+
+    // function used to extend nulls.
+    // this is independent of the arrays and therefore has no lifetime.
+    extend_nulls: ExtendNulls,
 }
 
 impl<'a> std::fmt::Debug for MutableArrayData<'a> {
@@ -214,10 +229,65 @@ fn build_extend(array: &ArrayData) -> Extend {
     }
 }
 
+fn build_extend_nulls(data_type: &DataType) -> ExtendNulls {
+    use crate::datatypes::*;
+    Box::new(match data_type {
+        DataType::Boolean => boolean::extend_nulls,
+        DataType::UInt8 => primitive::extend_nulls::<u8>,
+        DataType::UInt16 => primitive::extend_nulls::<u16>,
+        DataType::UInt32 => primitive::extend_nulls::<u32>,
+        DataType::UInt64 => primitive::extend_nulls::<u64>,
+        DataType::Int8 => primitive::extend_nulls::<i8>,
+        DataType::Int16 => primitive::extend_nulls::<i16>,
+        DataType::Int32 => primitive::extend_nulls::<i32>,
+        DataType::Int64 => primitive::extend_nulls::<i64>,
+        DataType::Float32 => primitive::extend_nulls::<f32>,
+        DataType::Float64 => primitive::extend_nulls::<f64>,
+        DataType::Date32(_)
+        | DataType::Time32(_)
+        | DataType::Interval(IntervalUnit::YearMonth) => primitive::extend_nulls::<i32>,
+        DataType::Date64(_)
+        | DataType::Time64(_)
+        | DataType::Timestamp(_, _)
+        | DataType::Duration(_)
+        | DataType::Interval(IntervalUnit::DayTime) => primitive::extend_nulls::<i64>,
+        DataType::Utf8 | DataType::Binary => variable_size::extend_nulls::<i32>,
+        DataType::LargeUtf8 | DataType::LargeBinary => variable_size::extend_nulls::<i64>,
+        DataType::List(_) => list::extend_nulls::<i32>,
+        DataType::LargeList(_) => list::extend_nulls::<i64>,
+        DataType::Dictionary(child_data_type, _) => match child_data_type.as_ref() {
+            DataType::UInt8 => primitive::extend_nulls::<u8>,
+            DataType::UInt16 => primitive::extend_nulls::<u16>,
+            DataType::UInt32 => primitive::extend_nulls::<u32>,
+            DataType::UInt64 => primitive::extend_nulls::<u64>,
+            DataType::Int8 => primitive::extend_nulls::<i8>,
+            DataType::Int16 => primitive::extend_nulls::<i16>,
+            DataType::Int32 => primitive::extend_nulls::<i32>,
+            DataType::Int64 => primitive::extend_nulls::<i64>,
+            _ => unreachable!(),
+        },
+        //DataType::Struct(_) => structure::build_extend(array),
+        DataType::Float16 => unreachable!(),
+        /*
+        DataType::Null => {}
+        DataType::FixedSizeBinary(_) => {}
+        DataType::FixedSizeList(_, _) => {}
+        DataType::Union(_) => {}
+        */
+        _ => {
+            todo!("Take and filter operations still not supported for this datatype")
+        }
+    })
+}
+
 impl<'a> MutableArrayData<'a> {
     /// returns a new [MutableArrayData] with capacity to `capacity` slots and specialized to create an
-    /// [ArrayData] from `array`
-    pub fn new(arrays: Vec<&'a ArrayData>, capacity: usize) -> Self {
+    /// [ArrayData] from multiple `arrays`.
+    ///
+    /// `use_nulls` is a flag used to optimize insertions. It should be `false` if the only source of nulls
+    /// are the arrays themselves and `true` if the user plans to call [MutableArrayData::extend_nulls].
+    /// In other words, if `use_nulls` is `false`, calling [MutableArrayData::extend_nulls] should not be used.
+    pub fn new(arrays: Vec<&'a ArrayData>, use_nulls: bool, capacity: usize) -> Self {
         let data_type = arrays[0].data_type();
         use crate::datatypes::*;
 
@@ -321,7 +391,7 @@ impl<'a> MutableArrayData<'a> {
                     .iter()
                     .map(|array| array.child_data()[0].as_ref())
                     .collect::<Vec<_>>();
-                vec![MutableArrayData::new(childs, capacity)]
+                vec![MutableArrayData::new(childs, use_nulls, capacity)]
             }
             // the dictionary type just appends keys and clones the values.
             DataType::Dictionary(_, _) => vec![],
@@ -336,9 +406,11 @@ impl<'a> MutableArrayData<'a> {
             _ => None,
         };
 
-        let extend_nulls = arrays
+        let extend_nulls = build_extend_nulls(data_type);
+
+        let extend_null_bits = arrays
             .iter()
-            .map(|array| build_extend_nulls(array))
+            .map(|array| build_extend_null_bits(array, use_nulls))
             .collect();
 
         let null_bytes = bit_util::ceil(capacity, 8);
@@ -355,10 +427,11 @@ impl<'a> MutableArrayData<'a> {
             child_data,
         };
         Self {
-            arrays: arrays.to_vec(),
+            arrays,
             data,
             dictionary,
             extend_values,
+            extend_null_bits,
             extend_nulls,
         }
     }
@@ -369,11 +442,18 @@ impl<'a> MutableArrayData<'a> {
     /// This function panics if the range is out of bounds, i.e. if `start + len >= array.len()`.
     pub fn extend(&mut self, index: usize, start: usize, end: usize) {
         let len = end - start;
-        (self.extend_nulls[index])(&mut self.data, start, len);
+        (self.extend_null_bits[index])(&mut self.data, start, len);
         (self.extend_values[index])(&mut self.data, index, start, len);
         self.data.len += len;
     }
 
+    /// Extends this [MutableArrayData] with null elements, disregarding the bound arrays
+    pub fn extend_nulls(&mut self, len: usize) {
+        self.data.null_count += len;
+        (self.extend_nulls)(&mut self.data, len);
+        self.data.len += len;
+    }
+
     /// Creates a [ArrayData] from the pushed regions up to this point, consuming `self`.
     pub fn freeze(self) -> ArrayData {
         self.data.freeze(self.dictionary)
@@ -396,7 +476,7 @@ mod tests {
     fn test_primitive() {
         let b = UInt8Array::from(vec![Some(1), Some(2), Some(3)]).data();
         let arrays = vec![b.as_ref()];
-        let mut a = MutableArrayData::new(arrays, 3);
+        let mut a = MutableArrayData::new(arrays, false, 3);
         a.extend(0, 0, 2);
         let result = a.freeze();
         let array = UInt8Array::from(Arc::new(result));
@@ -410,7 +490,7 @@ mod tests {
         let b = UInt8Array::from(vec![Some(1), Some(2), Some(3)]);
         let b = b.slice(1, 2).data();
         let arrays = vec![b.as_ref()];
-        let mut a = MutableArrayData::new(arrays, 2);
+        let mut a = MutableArrayData::new(arrays, false, 2);
         a.extend(0, 0, 2);
         let result = a.freeze();
         let array = UInt8Array::from(Arc::new(result));
@@ -424,7 +504,7 @@ mod tests {
         let b = UInt8Array::from(vec![Some(1), None, Some(3)]);
         let b = b.slice(1, 2).data();
         let arrays = vec![b.as_ref()];
-        let mut a = MutableArrayData::new(arrays, 2);
+        let mut a = MutableArrayData::new(arrays, false, 2);
         a.extend(0, 0, 2);
         let result = a.freeze();
         let array = UInt8Array::from(Arc::new(result));
@@ -433,6 +513,22 @@ mod tests {
     }
 
     #[test]
+    fn test_primitive_null_offset_nulls() {
+        let b = UInt8Array::from(vec![Some(1), Some(2), Some(3)]);
+        let b = b.slice(1, 2).data();
+        let arrays = vec![b.as_ref()];
+        let mut a = MutableArrayData::new(arrays, true, 2);
+        a.extend(0, 0, 2);
+        a.extend_nulls(3);
+        a.extend(0, 1, 2);
+        let result = a.freeze();
+        let array = UInt8Array::from(Arc::new(result));
+        let expected =
+            UInt8Array::from(vec![Some(2), Some(3), None, None, None, Some(3)]);
+        assert_eq!(array, expected);
+    }
+
+    #[test]
     fn test_list_null_offset() -> Result<()> {
         let int_builder = Int64Builder::new(24);
         let mut builder = ListBuilder::<Int64Builder>::new(int_builder);
@@ -445,7 +541,7 @@ mod tests {
         let array = builder.finish().data();
         let arrays = vec![array.as_ref()];
 
-        let mut mutable = MutableArrayData::new(arrays, 0);
+        let mut mutable = MutableArrayData::new(arrays, false, 0);
         mutable.extend(0, 0, 1);
 
         let result = mutable.freeze();
@@ -469,7 +565,7 @@ mod tests {
             StringArray::from(vec![Some("a"), Some("bc"), None, Some("defh")]).data();
         let arrays = vec![array.as_ref()];
 
-        let mut mutable = MutableArrayData::new(arrays, 0);
+        let mut mutable = MutableArrayData::new(arrays, false, 0);
 
         mutable.extend(0, 1, 3);
 
@@ -490,7 +586,7 @@ mod tests {
 
         let arrays = vec![&array];
 
-        let mut mutable = MutableArrayData::new(arrays, 0);
+        let mut mutable = MutableArrayData::new(arrays, false, 0);
 
         mutable.extend(0, 0, 3);
 
@@ -502,12 +598,51 @@ mod tests {
     }
 
     #[test]
+    fn test_string_offsets() {
+        let array =
+            StringArray::from(vec![Some("a"), Some("bc"), None, Some("defh")]).data();
+        let array = array.slice(1, 3);
+
+        let arrays = vec![&array];
+
+        let mut mutable = MutableArrayData::new(arrays, false, 0);
+
+        mutable.extend(0, 0, 3);
+
+        let result = mutable.freeze();
+        let result = StringArray::from(Arc::new(result));
+
+        let expected = StringArray::from(vec![Some("bc"), None, Some("defh")]);
+        assert_eq!(result, expected);
+    }
+
+    #[test]
+    fn test_string_null_offset_nulls() {
+        let array =
+            StringArray::from(vec![Some("a"), Some("bc"), None, Some("defh")]).data();
+        let array = array.slice(1, 3);
+
+        let arrays = vec![&array];
+
+        let mut mutable = MutableArrayData::new(arrays, true, 0);
+
+        mutable.extend(0, 1, 3);
+        mutable.extend_nulls(1);
+
+        let result = mutable.freeze();
+        let result = StringArray::from(Arc::new(result));
+
+        let expected = StringArray::from(vec![None, Some("defh"), None]);
+        assert_eq!(result, expected);
+    }
+
+    #[test]
     fn test_bool() {
         let array =
             BooleanArray::from(vec![Some(false), Some(true), None, Some(false)]).data();
         let arrays = vec![array.as_ref()];
 
-        let mut mutable = MutableArrayData::new(arrays, 0);
+        let mut mutable = MutableArrayData::new(arrays, false, 0);
 
         mutable.extend(0, 1, 3);
 
@@ -544,7 +679,7 @@ mod tests {
         );
         let arrays = vec![array.as_ref()];
 
-        let mut mutable = MutableArrayData::new(arrays, 0);
+        let mut mutable = MutableArrayData::new(arrays, false, 0);
 
         mutable.extend(0, 1, 3);
 
diff --git a/rust/arrow/src/array/transform/primitive.rs b/rust/arrow/src/array/transform/primitive.rs
index 3560802..a00ae4e 100644
--- a/rust/arrow/src/array/transform/primitive.rs
+++ b/rust/arrow/src/array/transform/primitive.rs
@@ -33,3 +33,12 @@ pub(super) fn build_extend<T: ArrowNativeType>(array: &ArrayData) -> Extend {
         },
     )
 }
+
+pub(super) fn extend_nulls<T: ArrowNativeType>(
+    mutable: &mut _MutableArrayData,
+    len: usize,
+) {
+    let buffer = &mut mutable.buffers[0];
+    let bytes = vec![0u8; len * size_of::<T>()];
+    buffer.extend_from_slice(&bytes);
+}
diff --git a/rust/arrow/src/array/transform/variable_size.rs b/rust/arrow/src/array/transform/variable_size.rs
index 48e77d5..6735c84 100644
--- a/rust/arrow/src/array/transform/variable_size.rs
+++ b/rust/arrow/src/array/transform/variable_size.rs
@@ -91,3 +91,16 @@ pub(super) fn build_extend<T: OffsetSizeTrait>(array: &ArrayData) -> Extend {
         )
     }
 }
+
+pub(super) fn extend_nulls<T: OffsetSizeTrait>(
+    mutable: &mut _MutableArrayData,
+    len: usize,
+) {
+    let mutable_offsets = mutable.buffer::<T>(0);
+    let last_offset = mutable_offsets[mutable_offsets.len() - 1];
+
+    let offset_buffer = &mut mutable.buffers[0];
+
+    let offsets = vec![last_offset; len];
+    offset_buffer.extend_from_slice(offsets.to_byte_slice());
+}
diff --git a/rust/datafusion/src/logical_plan/builder.rs b/rust/datafusion/src/logical_plan/builder.rs
index 08aa4ac..b36a189 100644
--- a/rust/datafusion/src/logical_plan/builder.rs
+++ b/rust/datafusion/src/logical_plan/builder.rs
@@ -203,6 +203,7 @@ impl LogicalPlanBuilder {
                 .collect::<Vec<_>>();
             let physical_join_type = match join_type {
                 JoinType::Inner => hash_utils::JoinType::Inner,
+                JoinType::Left => hash_utils::JoinType::Left,
             };
             let physical_schema = hash_utils::build_join_schema(
                 self.plan.schema(),
diff --git a/rust/datafusion/src/logical_plan/plan.rs b/rust/datafusion/src/logical_plan/plan.rs
index 74f2082..9378fc0 100644
--- a/rust/datafusion/src/logical_plan/plan.rs
+++ b/rust/datafusion/src/logical_plan/plan.rs
@@ -46,6 +46,8 @@ pub enum TableSource {
 pub enum JoinType {
     /// Inner join
     Inner,
+    /// Left join
+    Left,
 }
 
 /// A LogicalPlan represents the different types of relational
diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs
index 8f86df2..b36b9ee 100644
--- a/rust/datafusion/src/physical_plan/hash_join.rs
+++ b/rust/datafusion/src/physical_plan/hash_join.rs
@@ -285,15 +285,14 @@ fn build_batch_from_indices(
             .map(|array| array.as_ref())
             .collect::<Vec<_>>();
         let capacity = arrays.iter().map(|array| array.len()).sum();
-        let mut mutable = MutableArrayData::new(arrays, capacity);
+        let mut mutable = MutableArrayData::new(arrays, true, capacity);
 
         let array = if is_left {
             // build the array using the left
             for (join_index, _) in indices {
                 match join_index {
                     Some((batch, row)) => mutable.extend(*batch, *row, *row + 1),
-                    // something like `mutable.extend_nulls(*row, *row + 1)`
-                    None => unimplemented!(),
+                    None => mutable.extend_nulls(1),
                 }
             }
             make_array(Arc::new(mutable.freeze()))
@@ -302,8 +301,7 @@ fn build_batch_from_indices(
             for (_, join_index) in indices {
                 match join_index {
                     Some((batch, row)) => mutable.extend(*batch, *row, *row + 1),
-                    // something like `mutable.extend_nulls(*row, *row + 1)`
-                    None => unimplemented!(),
+                    None => mutable.extend_nulls(1),
                 }
             }
             make_array(Arc::new(mutable.freeze()))
@@ -384,6 +382,31 @@ fn build_join_indexes(
             }
             Ok(indexes)
         }
+        JoinType::Left => {
+            // left => left keys
+            let keys = left.keys();
+
+            let mut indexes = Vec::new(); // unknown a prior size
+            for key in keys {
+                let left_indexes = left.get(key).unwrap();
+
+                // for every item on the left and right with this key, add the respective pair
+                if let Some(right_indexes) = right.get(key) {
+                    left_indexes.iter().for_each(|x| {
+                        right_indexes.iter().for_each(|y| {
+                            // on an inner join, left and right indices are present
+                            indexes.push((Some(*x), Some(*y)));
+                        })
+                    })
+                } else {
+                    // key not on the right => push Nones
+                    left_indexes.iter().for_each(|x| {
+                        indexes.push((Some(*x), None));
+                    })
+                }
+            }
+            Ok(indexes)
+        }
     }
 }
 
@@ -435,12 +458,13 @@ mod tests {
         left: Arc<dyn ExecutionPlan>,
         right: Arc<dyn ExecutionPlan>,
         on: &[(&str, &str)],
+        join_type: &JoinType,
     ) -> Result<HashJoinExec> {
         let on: Vec<_> = on
             .iter()
             .map(|(l, r)| (l.to_string(), r.to_string()))
             .collect();
-        HashJoinExec::try_new(left, right, &on, &JoinType::Inner)
+        HashJoinExec::try_new(left, right, &on, join_type)
     }
 
     /// Asserts that the rows are the same, taking into account that their order
@@ -457,7 +481,7 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn join_one() -> Result<()> {
+    async fn join_inner_one() -> Result<()> {
         let left = build_table(
             ("a1", &vec![1, 2, 3]),
             ("b1", &vec![4, 5, 5]), // this has a repetition
@@ -470,7 +494,7 @@ mod tests {
         );
         let on = &[("b1", "b1")];
 
-        let join = join(left, right, on)?;
+        let join = join(left, right, on, &JoinType::Inner)?;
 
         let columns = columns(&join.schema());
         assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "c2"]);
@@ -487,7 +511,7 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn join_one_no_shared_column_names() -> Result<()> {
+    async fn join_inner_one_no_shared_column_names() -> Result<()> {
         let left = build_table(
             ("a1", &vec![1, 2, 3]),
             ("b1", &vec![4, 5, 5]), // this has a repetition
@@ -500,7 +524,7 @@ mod tests {
         );
         let on = &[("b1", "b2")];
 
-        let join = join(left, right, on)?;
+        let join = join(left, right, on, &JoinType::Inner)?;
 
         let columns = columns(&join.schema());
         assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
@@ -517,7 +541,7 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn join_two() -> Result<()> {
+    async fn join_inner_two() -> Result<()> {
         let left = build_table(
             ("a1", &vec![1, 2, 2]),
             ("b2", &vec![1, 2, 2]),
@@ -530,7 +554,7 @@ mod tests {
         );
         let on = &[("a1", "a1"), ("b2", "b2")];
 
-        let join = join(left, right, on)?;
+        let join = join(left, right, on, &JoinType::Inner)?;
 
         let columns = columns(&join.schema());
         assert_eq!(columns, vec!["a1", "b2", "c1", "c2"]);
@@ -549,7 +573,7 @@ mod tests {
 
     /// Test where the left has 2 parts, the right with 1 part => 1 part
     #[tokio::test]
-    async fn join_one_two_parts_left() -> Result<()> {
+    async fn join_inner_one_two_parts_left() -> Result<()> {
         let batch1 = build_table_i32(
             ("a1", &vec![1, 2]),
             ("b2", &vec![1, 2]),
@@ -569,7 +593,7 @@ mod tests {
         );
         let on = &[("a1", "a1"), ("b2", "b2")];
 
-        let join = join(left, right, on)?;
+        let join = join(left, right, on, &JoinType::Inner)?;
 
         let columns = columns(&join.schema());
         assert_eq!(columns, vec!["a1", "b2", "c1", "c2"]);
@@ -588,7 +612,7 @@ mod tests {
 
     /// Test where the left has 1 part, the right has 2 parts => 2 parts
     #[tokio::test]
-    async fn join_one_two_parts_right() -> Result<()> {
+    async fn join_inner_one_two_parts_right() -> Result<()> {
         let left = build_table(
             ("a1", &vec![1, 2, 3]),
             ("b1", &vec![4, 5, 5]), // this has a repetition
@@ -609,7 +633,7 @@ mod tests {
 
         let on = &[("b1", "b1")];
 
-        let join = join(left, right, on)?;
+        let join = join(left, right, on, &JoinType::Inner)?;
 
         let columns = columns(&join.schema());
         assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "c2"]);
@@ -634,4 +658,34 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn join_left_one() -> Result<()> {
+        let left = build_table(
+            ("a1", &vec![1, 2, 3]),
+            ("b1", &vec![4, 5, 7]), // 7 does not exist on the right
+            ("c1", &vec![7, 8, 9]),
+        );
+        let right = build_table(
+            ("a2", &vec![10, 20, 30]),
+            ("b1", &vec![4, 5, 6]),
+            ("c2", &vec![70, 80, 90]),
+        );
+        let on = &[("b1", "b1")];
+
+        let join = join(left, right, on, &JoinType::Left)?;
+
+        let columns = columns(&join.schema());
+        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "c2"]);
+
+        let stream = join.execute(0).await?;
+        let batches = common::collect(stream).await?;
+
+        let result = format_batch(&batches[0]);
+        let expected = vec!["1,4,7,10,70", "2,5,8,20,80", "3,7,9,NULL,NULL"];
+
+        assert_same_rows(&result, &expected);
+
+        Ok(())
+    }
 }
diff --git a/rust/datafusion/src/physical_plan/hash_utils.rs b/rust/datafusion/src/physical_plan/hash_utils.rs
index 1492c03..f41b811 100644
--- a/rust/datafusion/src/physical_plan/hash_utils.rs
+++ b/rust/datafusion/src/physical_plan/hash_utils.rs
@@ -26,6 +26,8 @@ use std::collections::HashSet;
 pub enum JoinType {
     /// Inner join
     Inner,
+    /// Left
+    Left,
 }
 
 /// The on clause of the join, as vector of (left, right) columns.
@@ -93,7 +95,7 @@ pub fn build_join_schema(
     join_type: &JoinType,
 ) -> Schema {
     let fields: Vec<Field> = match join_type {
-        JoinType::Inner => {
+        JoinType::Inner | JoinType::Left => {
             // remove right-side join keys if they have the same names as the left-side
             let duplicate_keys = &on
                 .iter()
diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs
index b592111..3b2488b 100644
--- a/rust/datafusion/src/physical_plan/planner.rs
+++ b/rust/datafusion/src/physical_plan/planner.rs
@@ -301,6 +301,7 @@ impl DefaultPhysicalPlanner {
                 let right = self.create_physical_plan(right, ctx_state)?;
                 let physical_join_type = match join_type {
                     JoinType::Inner => hash_utils::JoinType::Inner,
+                    JoinType::Left => hash_utils::JoinType::Left,
                 };
                 Ok(Arc::new(HashJoinExec::try_new(
                     left,
diff --git a/rust/datafusion/src/test/mod.rs b/rust/datafusion/src/test/mod.rs
index 99f6c7d..eccbbf8 100644
--- a/rust/datafusion/src/test/mod.rs
+++ b/rust/datafusion/src/test/mod.rs
@@ -133,6 +133,12 @@ pub fn format_batch(batch: &RecordBatch) -> Vec<String> {
                 s.push(',');
             }
             let array = batch.column(column_index);
+
+            if array.is_null(row_index) {
+                s.push_str("NULL");
+                continue;
+            }
+
             match array.data_type() {
                 DataType::Utf8 => s.push_str(
                     array