You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/18 20:10:32 UTC
[arrow-datafusion] branch master updated: Add ambiguous check for join (#4258)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 8c02485cf Add ambiguous check for join (#4258)
8c02485cf is described below
commit 8c02485cfc3d2c48bc48a557db58e3e5a0f75777
Author: ygf11 <ya...@gmail.com>
AuthorDate: Sat Nov 19 04:10:26 2022 +0800
Add ambiguous check for join (#4258)
* Add ambiguous check for join
* change DataFusionError::Internal to DataFusionError::Plan
* Update datafusion/sql/src/planner.rs
Co-authored-by: Ruihang Xia <wa...@gmail.com>
Co-authored-by: Ruihang Xia <wa...@gmail.com>
---
datafusion/sql/src/planner.rs | 77 +++++++++++++++++++++++++++++++++++++++++++
1 file changed, 77 insertions(+)
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index 5d4c47faf..d8672b407 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -718,6 +718,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// parse ON expression
let expr = self.sql_to_rex(sql_expr, &join_schema, ctes)?;
+ // ambiguous check
+ ensure_any_column_reference_is_determined(
+ &expr,
+ &[left.schema().clone(), right.schema().clone()],
+ )?;
+
// normalize all columns in expression
let using_columns = expr.to_columns()?;
let normalized_expr = normalize_col_with_schemas(
@@ -3165,6 +3171,64 @@ fn wrap_projection_for_join_if_necessary(
Ok((plan, need_project))
}
+/// Ensure any column reference of the expression is determined.
+/// Assume we have two schema:
+/// schema1: a, b ,c
+/// schema2: a, d, e
+///
+/// `schema1.a + schema2.a` is determined.
+/// `a + d` is not determined, because `a` may come from schema1 or schema2.
+fn ensure_any_column_reference_is_determined(
+ expr: &Expr,
+ schemas: &[DFSchemaRef],
+) -> Result<()> {
+ if schemas.len() == 1 {
+ return Ok(());
+ }
+
+ // extract columns both in one more schemas.
+ let mut column_count_map: HashMap<String, usize> = HashMap::new();
+ schemas
+ .iter()
+ .flat_map(|schema| schema.fields())
+ .for_each(|field| {
+ column_count_map
+ .entry(field.name().into())
+ .and_modify(|v| *v += 1)
+ .or_insert(1usize);
+ });
+
+ let duplicated_column_set = column_count_map
+ .into_iter()
+ .filter_map(|(column, count)| if count > 1 { Some(column) } else { None })
+ .collect::<HashSet<String>>();
+
+ // check if there is ambiguous column.
+ let using_columns = expr.to_columns()?;
+ let ambiguous_column = using_columns.iter().find(|column| {
+ column.relation.is_none() && duplicated_column_set.contains(&column.name)
+ });
+
+ if let Some(column) = ambiguous_column {
+ let maybe_field = schemas
+ .iter()
+ .flat_map(|schema| {
+ schema
+ .field_with_unqualified_name(&column.name)
+ .map(|f| f.qualified_name())
+ .ok()
+ })
+ .collect::<Vec<_>>();
+ Err(DataFusionError::Plan(format!(
+ "reference \'{}\' is ambiguous, could be {};",
+ column.name,
+ maybe_field.join(","),
+ )))
+ } else {
+ Ok(())
+ }
+}
+
#[cfg(test)]
mod tests {
use std::any::Any;
@@ -5992,6 +6056,19 @@ mod tests {
quick_test(sql, expected);
}
+ #[test]
+ fn test_ambiguous_coulmn_referece_in_join() {
+ // The error message will be:
+ // reference 'id' is ambiguous, could be p1.id,p2.id;
+ let sql = "select p1.id, p1.age, p2.id
+ from person as p1
+ INNER JOIN person as p2
+ ON id = 1";
+
+ // It should return error.
+ assert!(logical_plan(sql).is_err());
+ }
+
fn assert_field_not_found(err: DataFusionError, name: &str) {
match err {
DataFusionError::SchemaError { .. } => {