You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by yj...@apache.org on 2022/04/21 02:08:50 UTC
[arrow-datafusion] branch master updated: Fix outer join output with all-null indices on empty batch (#2272)
This is an automated email from the ASF dual-hosted git repository.
yjshen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new b9bf53a16 Fix outer join output with all-null indices on empty batch (#2272)
b9bf53a16 is described below
commit b9bf53a16a2292b051c6f5380832f9cf25fa5305
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Thu Apr 21 10:08:45 2022 +0800
Fix outer join output with all-null indices on empty batch (#2272)
* Fix outer join output with all-null indices
* refactor test
* t1_value
* sql
* right join
---
datafusion/core/src/physical_plan/hash_join.rs | 19 +++++--
datafusion/core/tests/sql/joins.rs | 72 ++++++++++++++++++++++++++
2 files changed, 88 insertions(+), 3 deletions(-)
diff --git a/datafusion/core/src/physical_plan/hash_join.rs b/datafusion/core/src/physical_plan/hash_join.rs
index 488222acc..ce371fecf 100644
--- a/datafusion/core/src/physical_plan/hash_join.rs
+++ b/datafusion/core/src/physical_plan/hash_join.rs
@@ -38,7 +38,7 @@ use async_trait::async_trait;
use futures::{Stream, StreamExt, TryStreamExt};
use tokio::sync::Mutex;
-use arrow::array::Array;
+use arrow::array::{new_null_array, Array};
use arrow::datatypes::DataType;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
@@ -593,11 +593,24 @@ fn build_batch_from_indices(
let array = match column_index.side {
JoinSide::Left => {
let array = left.column(column_index.index);
- compute::take(array.as_ref(), &left_indices, None)?
+ if array.is_empty() || left_indices.null_count() == left_indices.len() {
+ // Outer join would generate a null index when finding no match at our side.
+ // Therefore, it's possible we are empty but need to populate an n-length null array,
+ // where n is the length of the index array.
+ assert_eq!(left_indices.null_count(), left_indices.len());
+ new_null_array(array.data_type(), left_indices.len())
+ } else {
+ compute::take(array.as_ref(), &left_indices, None)?
+ }
}
JoinSide::Right => {
let array = right.column(column_index.index);
- compute::take(array.as_ref(), &right_indices, None)?
+ if array.is_empty() || right_indices.null_count() == right_indices.len() {
+ assert_eq!(right_indices.null_count(), right_indices.len());
+ new_null_array(array.data_type(), right_indices.len())
+ } else {
+ compute::take(array.as_ref(), &right_indices, None)?
+ }
}
};
columns.push(array);
diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs
index 485904157..6bfc3f384 100644
--- a/datafusion/core/tests/sql/joins.rs
+++ b/datafusion/core/tests/sql/joins.rs
@@ -948,3 +948,75 @@ async fn join_timestamp() -> Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn left_join_should_not_panic_with_empty_side() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ let t1_schema = Schema::new(vec![
+ Field::new("t1_id", DataType::Int64, true),
+ Field::new("t1_value", DataType::Utf8, false),
+ ]);
+ let t1_data = RecordBatch::try_new(
+ Arc::new(t1_schema),
+ vec![
+ Arc::new(Int64Array::from_slice(&[5247, 3821, 6321, 8821, 7748])),
+ Arc::new(StringArray::from_slice(&["a", "b", "c", "d", "e"])),
+ ],
+ )?;
+ let t1_table = MemTable::try_new(t1_data.schema(), vec![vec![t1_data]])?;
+ ctx.register_table("t1", Arc::new(t1_table))?;
+
+ let t2_schema = Schema::new(vec![
+ Field::new("t2_id", DataType::Int64, true),
+ Field::new("t2_value", DataType::Boolean, true),
+ ]);
+ let t2_data = RecordBatch::try_new(
+ Arc::new(t2_schema),
+ vec![
+ Arc::new(Int64Array::from_slice(&[358, 2820, 3804, 7748])),
+ Arc::new(BooleanArray::from(vec![
+ Some(true),
+ Some(false),
+ None,
+ None,
+ ])),
+ ],
+ )?;
+ let t2_table = MemTable::try_new(t2_data.schema(), vec![vec![t2_data]])?;
+ ctx.register_table("t2", Arc::new(t2_table))?;
+
+ let expected_left_join = vec![
+ "+-------+----------+-------+----------+",
+ "| t1_id | t1_value | t2_id | t2_value |",
+ "+-------+----------+-------+----------+",
+ "| 5247 | a | | |",
+ "| 3821 | b | | |",
+ "| 6321 | c | | |",
+ "| 8821 | d | | |",
+ "| 7748 | e | 7748 | |",
+ "+-------+----------+-------+----------+",
+ ];
+
+ let results_left_join =
+ execute_to_batches(&ctx, "SELECT * FROM t1 LEFT JOIN t2 ON t1_id = t2_id").await;
+ assert_batches_sorted_eq!(expected_left_join, &results_left_join);
+
+ let expected_right_join = vec![
+ "+-------+----------+-------+----------+",
+ "| t2_id | t2_value | t1_id | t1_value |",
+ "+-------+----------+-------+----------+",
+ "| | | 3821 | b |",
+ "| | | 5247 | a |",
+ "| | | 6321 | c |",
+ "| | | 8821 | d |",
+ "| 7748 | | 7748 | e |",
+ "+-------+----------+-------+----------+",
+ ];
+
+ let result_right_join =
+ execute_to_batches(&ctx, "SELECT * FROM t2 RIGHT JOIN t1 ON t1_id = t2_id").await;
+ assert_batches_sorted_eq!(expected_right_join, &result_right_join);
+
+ Ok(())
+}