You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by yj...@apache.org on 2022/04/21 02:08:50 UTC

[arrow-datafusion] branch master updated: Fix outer join output with all-null indices on empty batch (#2272)

This is an automated email from the ASF dual-hosted git repository.

yjshen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new b9bf53a16 Fix outer join output with all-null indices on empty batch (#2272)
b9bf53a16 is described below

commit b9bf53a16a2292b051c6f5380832f9cf25fa5305
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Thu Apr 21 10:08:45 2022 +0800

    Fix outer join output with all-null indices on empty batch (#2272)
    
    * Fix outer join output with all-null indices
    
    * refactor test
    
    * t1_value
    
    * sql
    
    * right join
---
 datafusion/core/src/physical_plan/hash_join.rs | 19 +++++--
 datafusion/core/tests/sql/joins.rs             | 72 ++++++++++++++++++++++++++
 2 files changed, 88 insertions(+), 3 deletions(-)

diff --git a/datafusion/core/src/physical_plan/hash_join.rs b/datafusion/core/src/physical_plan/hash_join.rs
index 488222acc..ce371fecf 100644
--- a/datafusion/core/src/physical_plan/hash_join.rs
+++ b/datafusion/core/src/physical_plan/hash_join.rs
@@ -38,7 +38,7 @@ use async_trait::async_trait;
 use futures::{Stream, StreamExt, TryStreamExt};
 use tokio::sync::Mutex;
 
-use arrow::array::Array;
+use arrow::array::{new_null_array, Array};
 use arrow::datatypes::DataType;
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::error::Result as ArrowResult;
@@ -593,11 +593,24 @@ fn build_batch_from_indices(
         let array = match column_index.side {
             JoinSide::Left => {
                 let array = left.column(column_index.index);
-                compute::take(array.as_ref(), &left_indices, None)?
+                if array.is_empty() || left_indices.null_count() == left_indices.len() {
+                    // Outer join would generate a null index when finding no match at our side.
+                    // Therefore, it's possible we are empty but need to populate an n-length null array,
+                    // where n is the length of the index array.
+                    assert_eq!(left_indices.null_count(), left_indices.len());
+                    new_null_array(array.data_type(), left_indices.len())
+                } else {
+                    compute::take(array.as_ref(), &left_indices, None)?
+                }
             }
             JoinSide::Right => {
                 let array = right.column(column_index.index);
-                compute::take(array.as_ref(), &right_indices, None)?
+                if array.is_empty() || right_indices.null_count() == right_indices.len() {
+                    assert_eq!(right_indices.null_count(), right_indices.len());
+                    new_null_array(array.data_type(), right_indices.len())
+                } else {
+                    compute::take(array.as_ref(), &right_indices, None)?
+                }
             }
         };
         columns.push(array);
diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs
index 485904157..6bfc3f384 100644
--- a/datafusion/core/tests/sql/joins.rs
+++ b/datafusion/core/tests/sql/joins.rs
@@ -948,3 +948,75 @@ async fn join_timestamp() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn left_join_should_not_panic_with_empty_side() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    let t1_schema = Schema::new(vec![
+        Field::new("t1_id", DataType::Int64, true),
+        Field::new("t1_value", DataType::Utf8, false),
+    ]);
+    let t1_data = RecordBatch::try_new(
+        Arc::new(t1_schema),
+        vec![
+            Arc::new(Int64Array::from_slice(&[5247, 3821, 6321, 8821, 7748])),
+            Arc::new(StringArray::from_slice(&["a", "b", "c", "d", "e"])),
+        ],
+    )?;
+    let t1_table = MemTable::try_new(t1_data.schema(), vec![vec![t1_data]])?;
+    ctx.register_table("t1", Arc::new(t1_table))?;
+
+    let t2_schema = Schema::new(vec![
+        Field::new("t2_id", DataType::Int64, true),
+        Field::new("t2_value", DataType::Boolean, true),
+    ]);
+    let t2_data = RecordBatch::try_new(
+        Arc::new(t2_schema),
+        vec![
+            Arc::new(Int64Array::from_slice(&[358, 2820, 3804, 7748])),
+            Arc::new(BooleanArray::from(vec![
+                Some(true),
+                Some(false),
+                None,
+                None,
+            ])),
+        ],
+    )?;
+    let t2_table = MemTable::try_new(t2_data.schema(), vec![vec![t2_data]])?;
+    ctx.register_table("t2", Arc::new(t2_table))?;
+
+    let expected_left_join = vec![
+        "+-------+----------+-------+----------+",
+        "| t1_id | t1_value | t2_id | t2_value |",
+        "+-------+----------+-------+----------+",
+        "| 5247  | a        |       |          |",
+        "| 3821  | b        |       |          |",
+        "| 6321  | c        |       |          |",
+        "| 8821  | d        |       |          |",
+        "| 7748  | e        | 7748  |          |",
+        "+-------+----------+-------+----------+",
+    ];
+
+    let results_left_join =
+        execute_to_batches(&ctx, "SELECT * FROM t1 LEFT JOIN t2 ON t1_id = t2_id").await;
+    assert_batches_sorted_eq!(expected_left_join, &results_left_join);
+
+    let expected_right_join = vec![
+        "+-------+----------+-------+----------+",
+        "| t2_id | t2_value | t1_id | t1_value |",
+        "+-------+----------+-------+----------+",
+        "|       |          | 3821  | b        |",
+        "|       |          | 5247  | a        |",
+        "|       |          | 6321  | c        |",
+        "|       |          | 8821  | d        |",
+        "| 7748  |          | 7748  | e        |",
+        "+-------+----------+-------+----------+",
+    ];
+
+    let result_right_join =
+        execute_to_batches(&ctx, "SELECT * FROM t2 RIGHT JOIN t1 ON t1_id = t2_id").await;
+    assert_batches_sorted_eq!(expected_right_join, &result_right_join);
+
+    Ok(())
+}