You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/05/04 16:09:52 UTC
[arrow-datafusion] branch master updated: fix: union schema (#2334)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 7304719bb fix: union schema (#2334)
7304719bb is described below
commit 7304719bb4830c873af32f873ce22f205fef4c77
Author: George Andronchik <g....@gmail.com>
AuthorDate: Thu May 5 00:09:47 2022 +0800
fix: union schema (#2334)
---
datafusion/core/src/execution/context.rs | 38 +++++++++++++++++++++++++++++
datafusion/core/src/logical_plan/builder.rs | 35 ++++++++++++++------------
2 files changed, 57 insertions(+), 16 deletions(-)
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 895e5bc1e..0f544ca89 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -2090,6 +2090,44 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn union_test() -> Result<()> {
+ let ctx = SessionContext::with_config(
+ SessionConfig::new().with_information_schema(true),
+ );
+
+ let result = ctx
+ .sql("SELECT 1 A UNION ALL SELECT 2")
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ let expected = vec!["+---+", "| a |", "+---+", "| 1 |", "| 2 |", "+---+"];
+ assert_batches_eq!(expected, &result);
+
+ let result = ctx
+ .sql("SELECT 1 UNION SELECT 2")
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ let expected = vec![
+ "+----------+",
+ "| Int64(1) |",
+ "+----------+",
+ "| 1 |",
+ "| 2 |",
+ "+----------+",
+ ];
+ assert_batches_eq!(expected, &result);
+
+ Ok(())
+ }
+
struct MyPhysicalPlanner {}
#[async_trait]
diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs
index 4f00bec60..1fbb1f5f9 100644
--- a/datafusion/core/src/logical_plan/builder.rs
+++ b/datafusion/core/src/logical_plan/builder.rs
@@ -1055,19 +1055,31 @@ pub fn union_with_alias(
right_plan: LogicalPlan,
alias: Option<String>,
) -> Result<LogicalPlan> {
- let inputs = vec![left_plan, right_plan]
+ let union_schema = left_plan.schema().clone();
+ let inputs_iter = vec![left_plan, right_plan]
.into_iter()
.flat_map(|p| match p {
LogicalPlan::Union(Union { inputs, .. }) => inputs,
x => vec![x],
- })
+ });
+
+ inputs_iter
+ .clone()
+ .skip(1)
+ .try_for_each(|input_plan| -> Result<()> {
+ union_schema.check_arrow_schema_type_compatible(
+ &((**input_plan.schema()).clone().into()),
+ )
+ })?;
+
+ let inputs = inputs_iter
.map(|p| match p {
LogicalPlan::Projection(Projection {
- expr,
- input,
- schema,
- alias,
- }) => project_with_column_index_alias(expr, input, schema, alias).unwrap(),
+ expr, input, alias, ..
+ }) => {
+ project_with_column_index_alias(expr, input, union_schema.clone(), alias)
+ .unwrap()
+ }
x => x,
})
.collect::<Vec<_>>();
@@ -1081,15 +1093,6 @@ pub fn union_with_alias(
None => union_schema.strip_qualifiers(),
});
- inputs
- .iter()
- .skip(1)
- .try_for_each(|input_plan| -> Result<()> {
- union_schema.check_arrow_schema_type_compatible(
- &((**input_plan.schema()).clone().into()),
- )
- })?;
-
Ok(LogicalPlan::Union(Union {
inputs,
schema: union_schema,