You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by av...@apache.org on 2023/01/29 19:46:30 UTC

[arrow-datafusion] branch master updated: Insert target columns empty fix (#5079)

This is an automated email from the ASF dual-hosted git repository.

avantgardner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 3133526c1 Insert target columns empty fix (#5079)
3133526c1 is described below

commit 3133526c1f697de43173f6a10c111588307835da
Author: Marko Grujic <ma...@gmail.com>
AuthorDate: Sun Jan 29 20:46:24 2023 +0100

    Insert target columns empty fix (#5079)
    
    * Use all columns in a table when none are specified for an INSERT
    
    * Add some tests for INSERT without target columns and edge cases
    
    * Add test for non-existent column supplied in INSERT statement
---
 datafusion/common/src/config.rs          |  2 +-
 datafusion/sql/src/statement.rs          | 42 +++++++++++------
 datafusion/sql/tests/integration_test.rs | 79 +++++++++++++++++++-------------
 3 files changed, 77 insertions(+), 46 deletions(-)

diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index 6c61af76a..3d67ddb7e 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -323,7 +323,7 @@ pub struct ConfigOptions {
     pub catalog: CatalogOptions,
     /// Execution options
     pub execution: ExecutionOptions,
-    /// Explain options
+    /// Optimizer options
     pub optimizer: OptimizerOptions,
     /// Explain options
     pub explain: ExplainOptions,
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 1428e92a5..8fe3dfdba 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -26,8 +26,8 @@ use crate::utils::normalize_ident;
 use arrow_schema::DataType;
 use datafusion_common::parsers::CompressionTypeVariant;
 use datafusion_common::{
-    Column, DFSchema, DFSchemaRef, DataFusionError, ExprSchema, OwnedTableReference,
-    Result, TableReference, ToDFSchema,
+    Column, DFField, DFSchema, DFSchemaRef, DataFusionError, ExprSchema,
+    OwnedTableReference, Result, TableReference, ToDFSchema,
 };
 use datafusion_expr::expr_rewriter::normalize_col_with_schemas;
 use datafusion_expr::logical_plan::builder::project;
@@ -792,6 +792,24 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         let arrow_schema = (*provider.schema()).clone();
         let table_schema = DFSchema::try_from(arrow_schema)?;
 
+        let fields = if columns.is_empty() {
+            // Empty means we're inserting into all columns of the table
+            table_schema.fields().clone()
+        } else {
+            let fields = columns
+                .iter()
+                .map(|c| {
+                    Ok(table_schema
+                        .field_with_unqualified_name(&normalize_ident(c.clone()))?
+                        .clone())
+                })
+                .collect::<Result<Vec<DFField>>>()?;
+            // Validate no duplicate fields
+            let table_schema =
+                DFSchema::new_with_metadata(fields, table_schema.metadata().clone())?;
+            table_schema.fields().clone()
+        };
+
         // infer types for Values clause... other types should be resolvable the regular way
         let mut prepare_param_data_types = BTreeMap::new();
         if let SetExpr::Values(ast::Values { rows, .. }) = (*source.body).clone() {
@@ -804,14 +822,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                                     "Can't parse placeholder: {name}"
                                 ))
                             })? - 1;
-                        let col = columns.get(idx).ok_or_else(|| {
+                        let field = fields.get(idx).ok_or_else(|| {
                             DataFusionError::Plan(format!(
                                 "Placeholder ${} refers to a non existent column",
                                 idx + 1
                             ))
                         })?;
-                        let field =
-                            table_schema.field_with_name(None, col.value.as_str())?;
                         let dt = field.field().data_type().clone();
                         let _ = prepare_param_data_types.insert(name, dt);
                     }
@@ -824,21 +840,19 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         let mut planner_context =
             PlannerContext::new_with_prepare_param_data_types(prepare_param_data_types);
         let source = self.query_to_plan(*source, &mut planner_context)?;
-        if columns.len() != source.schema().fields().len() {
+        if fields.len() != source.schema().fields().len() {
             Err(DataFusionError::Plan(
                 "Column count doesn't match insert query!".to_owned(),
             ))?;
         }
-        let values_schema = source.schema();
-        let exprs = columns
+        let exprs = fields
             .iter()
             .zip(source.schema().fields().iter())
-            .map(|(c, f)| {
-                let col_name = c.value.clone();
-                let col = table_schema.field_with_name(None, col_name.as_str())?;
-                let expr = datafusion_expr::Expr::Column(Column::from(f.name().clone()))
-                    .alias(col_name)
-                    .cast_to(col.data_type(), values_schema)?;
+            .map(|(target_field, source_field)| {
+                let expr =
+                    datafusion_expr::Expr::Column(source_field.unqualified_column())
+                        .cast_to(target_field.data_type(), source.schema())?
+                        .alias(target_field.name());
                 Ok(expr)
             })
             .collect::<Result<Vec<datafusion_expr::Expr>>>()?;
diff --git a/datafusion/sql/tests/integration_test.rs b/datafusion/sql/tests/integration_test.rs
index f2af65e87..270727259 100644
--- a/datafusion/sql/tests/integration_test.rs
+++ b/datafusion/sql/tests/integration_test.rs
@@ -165,13 +165,60 @@ fn plan_insert() {
         "insert into person (id, first_name, last_name) values (1, 'Alan', 'Turing')";
     let plan = r#"
 Dml: op=[Insert] table=[person]
-  Projection: CAST(column1 AS id AS UInt32), column2 AS first_name, column3 AS last_name
+  Projection: CAST(column1 AS UInt32) AS id, column2 AS first_name, column3 AS last_name
     Values: (Int64(1), Utf8("Alan"), Utf8("Turing"))
     "#
     .trim();
     quick_test(sql, plan);
 }
 
+#[test]
+fn plan_insert_no_target_columns() {
+    let sql = "INSERT INTO test_decimal VALUES (1, 2), (3, 4)";
+    let plan = r#"
+Dml: op=[Insert] table=[test_decimal]
+  Projection: CAST(column1 AS Int32) AS id, CAST(column2 AS Decimal128(10, 2)) AS price
+    Values: (Int64(1), Int64(2)), (Int64(3), Int64(4))
+    "#
+    .trim();
+    quick_test(sql, plan);
+}
+
+#[rstest]
+#[case::duplicate_columns(
+    "INSERT INTO test_decimal (id, price, price) VALUES (1, 2, 3), (4, 5, 6)",
+    "Schema error: Schema contains duplicate unqualified field name 'price'"
+)]
+#[case::non_existing_column(
+    "INSERT INTO test_decimal (nonexistent, price) VALUES (1, 2), (4, 5)",
+    "Schema error: No field named 'nonexistent'. Valid fields are 'id', 'price'."
+)]
+#[case::type_mismatch(
+    "INSERT INTO test_decimal SELECT '2022-01-01', to_timestamp('2022-01-01T12:00:00')",
+    "Error during planning: Cannot automatically convert Timestamp(Nanosecond, None) to Decimal128(10, 2)"
+)]
+#[case::target_column_count_mismatch(
+    "INSERT INTO person (id, first_name, last_name) VALUES ($1, $2)",
+    "Error during planning: Column count doesn't match insert query!"
+)]
+#[case::source_column_count_mismatch(
+    "INSERT INTO person VALUES ($1, $2)",
+    "Error during planning: Column count doesn't match insert query!"
+)]
+#[case::extra_placeholder(
+    "INSERT INTO person (id, first_name, last_name) VALUES ($1, $2, $3, $4)",
+    "Error during planning: Placeholder $4 refers to a non existent column"
+)]
+#[case::placeholder_type_unresolved(
+    "INSERT INTO person (id, first_name, last_name) VALUES ($2, $4, $6)",
+    "Error during planning: Placeholder type could not be resolved"
+)]
+#[test]
+fn test_insert_schema_errors(#[case] sql: &str, #[case] error: &str) {
+    let err = logical_plan(sql).unwrap_err();
+    assert_eq!(err.to_string(), error)
+}
+
 #[test]
 fn plan_update() {
     let sql = "update person set last_name='Kay' where id=1";
@@ -3464,36 +3511,6 @@ Dml: op=[Insert] table=[person]
     prepare_stmt_replace_params_quick_test(plan, param_values, expected_plan);
 }
 
-#[test]
-#[should_panic(expected = "Placeholder $4 refers to a non existent column")]
-fn test_prepare_statement_insert_infer_gt() {
-    let sql = "insert into person (id, first_name, last_name) values ($1, $2, $3, $4)";
-
-    let expected_plan = r#""#.trim();
-    let expected_dt = "[Int32]";
-    let _ = prepare_stmt_quick_test(sql, expected_plan, expected_dt);
-}
-
-#[test]
-#[should_panic(expected = "value: Plan(\"Column count doesn't match insert query!\")")]
-fn test_prepare_statement_insert_infer_lt() {
-    let sql = "insert into person (id, first_name, last_name) values ($1, $2)";
-
-    let expected_plan = r#""#.trim();
-    let expected_dt = "[Int32]";
-    let _ = prepare_stmt_quick_test(sql, expected_plan, expected_dt);
-}
-
-#[test]
-#[should_panic(expected = "value: Plan(\"Placeholder type could not be resolved\")")]
-fn test_prepare_statement_insert_infer_gap() {
-    let sql = "insert into person (id, first_name, last_name) values ($2, $4, $6)";
-
-    let expected_plan = r#""#.trim();
-    let expected_dt = "[Int32]";
-    let _ = prepare_stmt_quick_test(sql, expected_plan, expected_dt);
-}
-
 #[test]
 fn test_prepare_statement_to_plan_one_param() {
     let sql = "PREPARE my_plan(INT) AS SELECT id, age  FROM person WHERE age = $1";