You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/05/04 16:09:52 UTC

[arrow-datafusion] branch master updated: fix: union schema (#2334)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 7304719bb fix: union schema (#2334)
7304719bb is described below

commit 7304719bb4830c873af32f873ce22f205fef4c77
Author: George Andronchik <g....@gmail.com>
AuthorDate: Thu May 5 00:09:47 2022 +0800

    fix: union schema (#2334)
---
 datafusion/core/src/execution/context.rs    | 38 +++++++++++++++++++++++++++++
 datafusion/core/src/logical_plan/builder.rs | 35 ++++++++++++++------------
 2 files changed, 57 insertions(+), 16 deletions(-)

diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 895e5bc1e..0f544ca89 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -2090,6 +2090,44 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn union_test() -> Result<()> {
+        let ctx = SessionContext::with_config(
+            SessionConfig::new().with_information_schema(true),
+        );
+
+        let result = ctx
+            .sql("SELECT 1 A UNION ALL SELECT 2")
+            .await
+            .unwrap()
+            .collect()
+            .await
+            .unwrap();
+
+        let expected = vec!["+---+", "| a |", "+---+", "| 1 |", "| 2 |", "+---+"];
+        assert_batches_eq!(expected, &result);
+
+        let result = ctx
+            .sql("SELECT 1 UNION SELECT 2")
+            .await
+            .unwrap()
+            .collect()
+            .await
+            .unwrap();
+
+        let expected = vec![
+            "+----------+",
+            "| Int64(1) |",
+            "+----------+",
+            "| 1        |",
+            "| 2        |",
+            "+----------+",
+        ];
+        assert_batches_eq!(expected, &result);
+
+        Ok(())
+    }
+
     struct MyPhysicalPlanner {}
 
     #[async_trait]
diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs
index 4f00bec60..1fbb1f5f9 100644
--- a/datafusion/core/src/logical_plan/builder.rs
+++ b/datafusion/core/src/logical_plan/builder.rs
@@ -1055,19 +1055,31 @@ pub fn union_with_alias(
     right_plan: LogicalPlan,
     alias: Option<String>,
 ) -> Result<LogicalPlan> {
-    let inputs = vec![left_plan, right_plan]
+    let union_schema = left_plan.schema().clone();
+    let inputs_iter = vec![left_plan, right_plan]
         .into_iter()
         .flat_map(|p| match p {
             LogicalPlan::Union(Union { inputs, .. }) => inputs,
             x => vec![x],
-        })
+        });
+
+    inputs_iter
+        .clone()
+        .skip(1)
+        .try_for_each(|input_plan| -> Result<()> {
+            union_schema.check_arrow_schema_type_compatible(
+                &((**input_plan.schema()).clone().into()),
+            )
+        })?;
+
+    let inputs = inputs_iter
         .map(|p| match p {
             LogicalPlan::Projection(Projection {
-                expr,
-                input,
-                schema,
-                alias,
-            }) => project_with_column_index_alias(expr, input, schema, alias).unwrap(),
+                expr, input, alias, ..
+            }) => {
+                project_with_column_index_alias(expr, input, union_schema.clone(), alias)
+                    .unwrap()
+            }
             x => x,
         })
         .collect::<Vec<_>>();
@@ -1081,15 +1093,6 @@ pub fn union_with_alias(
         None => union_schema.strip_qualifiers(),
     });
 
-    inputs
-        .iter()
-        .skip(1)
-        .try_for_each(|input_plan| -> Result<()> {
-            union_schema.check_arrow_schema_type_compatible(
-                &((**input_plan.schema()).clone().into()),
-            )
-        })?;
-
     Ok(LogicalPlan::Union(Union {
         inputs,
         schema: union_schema,