You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by av...@apache.org on 2023/01/29 19:46:30 UTC
[arrow-datafusion] branch master updated: Insert target columns empty fix (#5079)
This is an automated email from the ASF dual-hosted git repository.
avantgardner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 3133526c1 Insert target columns empty fix (#5079)
3133526c1 is described below
commit 3133526c1f697de43173f6a10c111588307835da
Author: Marko Grujic <ma...@gmail.com>
AuthorDate: Sun Jan 29 20:46:24 2023 +0100
Insert target columns empty fix (#5079)
* Use all columns in a table when none are specified for an INSERT
* Add some tests for INSERT without target columns and edge cases
* Add test for non-existent column supplied in INSERT statement
---
datafusion/common/src/config.rs | 2 +-
datafusion/sql/src/statement.rs | 42 +++++++++++------
datafusion/sql/tests/integration_test.rs | 79 +++++++++++++++++++-------------
3 files changed, 77 insertions(+), 46 deletions(-)
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index 6c61af76a..3d67ddb7e 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -323,7 +323,7 @@ pub struct ConfigOptions {
pub catalog: CatalogOptions,
/// Execution options
pub execution: ExecutionOptions,
- /// Explain options
+ /// Optimizer options
pub optimizer: OptimizerOptions,
/// Explain options
pub explain: ExplainOptions,
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 1428e92a5..8fe3dfdba 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -26,8 +26,8 @@ use crate::utils::normalize_ident;
use arrow_schema::DataType;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{
- Column, DFSchema, DFSchemaRef, DataFusionError, ExprSchema, OwnedTableReference,
- Result, TableReference, ToDFSchema,
+ Column, DFField, DFSchema, DFSchemaRef, DataFusionError, ExprSchema,
+ OwnedTableReference, Result, TableReference, ToDFSchema,
};
use datafusion_expr::expr_rewriter::normalize_col_with_schemas;
use datafusion_expr::logical_plan::builder::project;
@@ -792,6 +792,24 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let arrow_schema = (*provider.schema()).clone();
let table_schema = DFSchema::try_from(arrow_schema)?;
+ let fields = if columns.is_empty() {
+ // Empty means we're inserting into all columns of the table
+ table_schema.fields().clone()
+ } else {
+ let fields = columns
+ .iter()
+ .map(|c| {
+ Ok(table_schema
+ .field_with_unqualified_name(&normalize_ident(c.clone()))?
+ .clone())
+ })
+ .collect::<Result<Vec<DFField>>>()?;
+ // Validate no duplicate fields
+ let table_schema =
+ DFSchema::new_with_metadata(fields, table_schema.metadata().clone())?;
+ table_schema.fields().clone()
+ };
+
// infer types for Values clause... other types should be resolvable the regular way
let mut prepare_param_data_types = BTreeMap::new();
if let SetExpr::Values(ast::Values { rows, .. }) = (*source.body).clone() {
@@ -804,14 +822,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
"Can't parse placeholder: {name}"
))
})? - 1;
- let col = columns.get(idx).ok_or_else(|| {
+ let field = fields.get(idx).ok_or_else(|| {
DataFusionError::Plan(format!(
"Placeholder ${} refers to a non existent column",
idx + 1
))
})?;
- let field =
- table_schema.field_with_name(None, col.value.as_str())?;
let dt = field.field().data_type().clone();
let _ = prepare_param_data_types.insert(name, dt);
}
@@ -824,21 +840,19 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let mut planner_context =
PlannerContext::new_with_prepare_param_data_types(prepare_param_data_types);
let source = self.query_to_plan(*source, &mut planner_context)?;
- if columns.len() != source.schema().fields().len() {
+ if fields.len() != source.schema().fields().len() {
Err(DataFusionError::Plan(
"Column count doesn't match insert query!".to_owned(),
))?;
}
- let values_schema = source.schema();
- let exprs = columns
+ let exprs = fields
.iter()
.zip(source.schema().fields().iter())
- .map(|(c, f)| {
- let col_name = c.value.clone();
- let col = table_schema.field_with_name(None, col_name.as_str())?;
- let expr = datafusion_expr::Expr::Column(Column::from(f.name().clone()))
- .alias(col_name)
- .cast_to(col.data_type(), values_schema)?;
+ .map(|(target_field, source_field)| {
+ let expr =
+ datafusion_expr::Expr::Column(source_field.unqualified_column())
+ .cast_to(target_field.data_type(), source.schema())?
+ .alias(target_field.name());
Ok(expr)
})
.collect::<Result<Vec<datafusion_expr::Expr>>>()?;
diff --git a/datafusion/sql/tests/integration_test.rs b/datafusion/sql/tests/integration_test.rs
index f2af65e87..270727259 100644
--- a/datafusion/sql/tests/integration_test.rs
+++ b/datafusion/sql/tests/integration_test.rs
@@ -165,13 +165,60 @@ fn plan_insert() {
"insert into person (id, first_name, last_name) values (1, 'Alan', 'Turing')";
let plan = r#"
Dml: op=[Insert] table=[person]
- Projection: CAST(column1 AS id AS UInt32), column2 AS first_name, column3 AS last_name
+ Projection: CAST(column1 AS UInt32) AS id, column2 AS first_name, column3 AS last_name
Values: (Int64(1), Utf8("Alan"), Utf8("Turing"))
"#
.trim();
quick_test(sql, plan);
}
+#[test]
+fn plan_insert_no_target_columns() {
+ let sql = "INSERT INTO test_decimal VALUES (1, 2), (3, 4)";
+ let plan = r#"
+Dml: op=[Insert] table=[test_decimal]
+ Projection: CAST(column1 AS Int32) AS id, CAST(column2 AS Decimal128(10, 2)) AS price
+ Values: (Int64(1), Int64(2)), (Int64(3), Int64(4))
+ "#
+ .trim();
+ quick_test(sql, plan);
+}
+
+#[rstest]
+#[case::duplicate_columns(
+ "INSERT INTO test_decimal (id, price, price) VALUES (1, 2, 3), (4, 5, 6)",
+ "Schema error: Schema contains duplicate unqualified field name 'price'"
+)]
+#[case::non_existing_column(
+ "INSERT INTO test_decimal (nonexistent, price) VALUES (1, 2), (4, 5)",
+ "Schema error: No field named 'nonexistent'. Valid fields are 'id', 'price'."
+)]
+#[case::type_mismatch(
+ "INSERT INTO test_decimal SELECT '2022-01-01', to_timestamp('2022-01-01T12:00:00')",
+ "Error during planning: Cannot automatically convert Timestamp(Nanosecond, None) to Decimal128(10, 2)"
+)]
+#[case::target_column_count_mismatch(
+ "INSERT INTO person (id, first_name, last_name) VALUES ($1, $2)",
+ "Error during planning: Column count doesn't match insert query!"
+)]
+#[case::source_column_count_mismatch(
+ "INSERT INTO person VALUES ($1, $2)",
+ "Error during planning: Column count doesn't match insert query!"
+)]
+#[case::extra_placeholder(
+ "INSERT INTO person (id, first_name, last_name) VALUES ($1, $2, $3, $4)",
+ "Error during planning: Placeholder $4 refers to a non existent column"
+)]
+#[case::placeholder_type_unresolved(
+ "INSERT INTO person (id, first_name, last_name) VALUES ($2, $4, $6)",
+ "Error during planning: Placeholder type could not be resolved"
+)]
+#[test]
+fn test_insert_schema_errors(#[case] sql: &str, #[case] error: &str) {
+ let err = logical_plan(sql).unwrap_err();
+ assert_eq!(err.to_string(), error)
+}
+
#[test]
fn plan_update() {
let sql = "update person set last_name='Kay' where id=1";
@@ -3464,36 +3511,6 @@ Dml: op=[Insert] table=[person]
prepare_stmt_replace_params_quick_test(plan, param_values, expected_plan);
}
-#[test]
-#[should_panic(expected = "Placeholder $4 refers to a non existent column")]
-fn test_prepare_statement_insert_infer_gt() {
- let sql = "insert into person (id, first_name, last_name) values ($1, $2, $3, $4)";
-
- let expected_plan = r#""#.trim();
- let expected_dt = "[Int32]";
- let _ = prepare_stmt_quick_test(sql, expected_plan, expected_dt);
-}
-
-#[test]
-#[should_panic(expected = "value: Plan(\"Column count doesn't match insert query!\")")]
-fn test_prepare_statement_insert_infer_lt() {
- let sql = "insert into person (id, first_name, last_name) values ($1, $2)";
-
- let expected_plan = r#""#.trim();
- let expected_dt = "[Int32]";
- let _ = prepare_stmt_quick_test(sql, expected_plan, expected_dt);
-}
-
-#[test]
-#[should_panic(expected = "value: Plan(\"Placeholder type could not be resolved\")")]
-fn test_prepare_statement_insert_infer_gap() {
- let sql = "insert into person (id, first_name, last_name) values ($2, $4, $6)";
-
- let expected_plan = r#""#.trim();
- let expected_dt = "[Int32]";
- let _ = prepare_stmt_quick_test(sql, expected_plan, expected_dt);
-}
-
#[test]
fn test_prepare_statement_to_plan_one_param() {
let sql = "PREPARE my_plan(INT) AS SELECT id, age FROM person WHERE age = $1";