You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/06/18 13:09:33 UTC
[arrow-datafusion] branch master updated: Support dates in hash join (#2746)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 9c6ee300a Support dates in hash join (#2746)
9c6ee300a is described below
commit 9c6ee300a6dc90e2a101cec99f1500031bd3882a
Author: Andy Grove <ag...@apache.org>
AuthorDate: Sat Jun 18 07:09:29 2022 -0600
Support dates in hash join (#2746)
---
datafusion/core/src/physical_plan/hash_join.rs | 57 ++++++++++++++++++++++++--
1 file changed, 54 insertions(+), 3 deletions(-)
diff --git a/datafusion/core/src/physical_plan/hash_join.rs b/datafusion/core/src/physical_plan/hash_join.rs
index 4777e1c44..042d9525f 100644
--- a/datafusion/core/src/physical_plan/hash_join.rs
+++ b/datafusion/core/src/physical_plan/hash_join.rs
@@ -22,9 +22,10 @@ use ahash::RandomState;
use arrow::{
array::{
- ArrayData, ArrayRef, BooleanArray, LargeStringArray, PrimitiveArray,
- TimestampMicrosecondArray, TimestampMillisecondArray, TimestampSecondArray,
- UInt32BufferBuilder, UInt32Builder, UInt64BufferBuilder, UInt64Builder,
+ ArrayData, ArrayRef, BooleanArray, Date32Array, Date64Array, LargeStringArray,
+ PrimitiveArray, TimestampMicrosecondArray, TimestampMillisecondArray,
+ TimestampSecondArray, UInt32BufferBuilder, UInt32Builder, UInt64BufferBuilder,
+ UInt64Builder,
},
compute,
datatypes::{UInt32Type, UInt64Type},
@@ -999,6 +1000,12 @@ fn equal_rows(
DataType::Float64 => {
equal_rows_elem!(Float64Array, l, r, left, right, null_equals_null)
}
+ DataType::Date32 => {
+ equal_rows_elem!(Date32Array, l, r, left, right, null_equals_null)
+ }
+ DataType::Date64 => {
+ equal_rows_elem!(Date64Array, l, r, left, right, null_equals_null)
+ }
DataType::Timestamp(time_unit, None) => match time_unit {
TimeUnit::Second => {
equal_rows_elem!(
@@ -2399,4 +2406,48 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn join_date32() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("date", DataType::Date32, false),
+ Field::new("n", DataType::Int32, false),
+ ]));
+
+ let dates: ArrayRef = Arc::new(Date32Array::from(vec![19107, 19108, 19109]));
+ let n: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
+ let batch = RecordBatch::try_new(schema.clone(), vec![dates, n])?;
+ let left =
+ Arc::new(MemoryExec::try_new(&[vec![batch]], schema.clone(), None).unwrap());
+
+ let dates: ArrayRef = Arc::new(Date32Array::from(vec![19108, 19108, 19109]));
+ let n: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6]));
+ let batch = RecordBatch::try_new(schema.clone(), vec![dates, n])?;
+ let right = Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None).unwrap());
+
+ let on = vec![(
+ Column::new_with_schema("date", &left.schema()).unwrap(),
+ Column::new_with_schema("date", &right.schema()).unwrap(),
+ )];
+
+ let join = join(left, right, on, &JoinType::Inner, false)?;
+
+ let session_ctx = SessionContext::new();
+ let task_ctx = session_ctx.task_ctx();
+ let stream = join.execute(0, task_ctx)?;
+ let batches = common::collect(stream).await?;
+
+ let expected = vec![
+ "+------------+---+------------+---+",
+ "| date | n | date | n |",
+ "+------------+---+------------+---+",
+ "| 2022-04-26 | 2 | 2022-04-26 | 4 |",
+ "| 2022-04-26 | 2 | 2022-04-26 | 5 |",
+ "| 2022-04-27 | 3 | 2022-04-27 | 6 |",
+ "+------------+---+------------+---+",
+ ];
+ assert_batches_sorted_eq!(expected, &batches);
+
+ Ok(())
+ }
}