You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/18 20:10:32 UTC

[arrow-datafusion] branch master updated: Add ambiguous check for join (#4258)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c02485cf Add ambiguous check for join (#4258)
8c02485cf is described below

commit 8c02485cfc3d2c48bc48a557db58e3e5a0f75777
Author: ygf11 <ya...@gmail.com>
AuthorDate: Sat Nov 19 04:10:26 2022 +0800

    Add ambiguous check for join (#4258)
    
    * Add ambiguous check for join
    
    * change DataFusionError::Internal to DataFusionError::Plan
    
    * Update datafusion/sql/src/planner.rs
    
    Co-authored-by: Ruihang Xia <wa...@gmail.com>
    
    Co-authored-by: Ruihang Xia <wa...@gmail.com>
---
 datafusion/sql/src/planner.rs | 77 +++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 77 insertions(+)

diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index 5d4c47faf..d8672b407 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -718,6 +718,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 // parse ON expression
                 let expr = self.sql_to_rex(sql_expr, &join_schema, ctes)?;
 
+                // ambiguous check
+                ensure_any_column_reference_is_determined(
+                    &expr,
+                    &[left.schema().clone(), right.schema().clone()],
+                )?;
+
                 // normalize all columns in expression
                 let using_columns = expr.to_columns()?;
                 let normalized_expr = normalize_col_with_schemas(
@@ -3165,6 +3171,64 @@ fn wrap_projection_for_join_if_necessary(
     Ok((plan, need_project))
 }
 
+/// Ensure any column reference of the expression is determined.
+/// Assume we have two schema:
+/// schema1: a, b ,c
+/// schema2: a, d, e
+///
+/// `schema1.a + schema2.a` is determined.
+/// `a + d` is not determined, because `a` may come from schema1 or schema2.
+fn ensure_any_column_reference_is_determined(
+    expr: &Expr,
+    schemas: &[DFSchemaRef],
+) -> Result<()> {
+    if schemas.len() == 1 {
+        return Ok(());
+    }
+
+    // extract columns both in one more schemas.
+    let mut column_count_map: HashMap<String, usize> = HashMap::new();
+    schemas
+        .iter()
+        .flat_map(|schema| schema.fields())
+        .for_each(|field| {
+            column_count_map
+                .entry(field.name().into())
+                .and_modify(|v| *v += 1)
+                .or_insert(1usize);
+        });
+
+    let duplicated_column_set = column_count_map
+        .into_iter()
+        .filter_map(|(column, count)| if count > 1 { Some(column) } else { None })
+        .collect::<HashSet<String>>();
+
+    // check if there is ambiguous column.
+    let using_columns = expr.to_columns()?;
+    let ambiguous_column = using_columns.iter().find(|column| {
+        column.relation.is_none() && duplicated_column_set.contains(&column.name)
+    });
+
+    if let Some(column) = ambiguous_column {
+        let maybe_field = schemas
+            .iter()
+            .flat_map(|schema| {
+                schema
+                    .field_with_unqualified_name(&column.name)
+                    .map(|f| f.qualified_name())
+                    .ok()
+            })
+            .collect::<Vec<_>>();
+        Err(DataFusionError::Plan(format!(
+            "reference \'{}\' is ambiguous, could be {};",
+            column.name,
+            maybe_field.join(","),
+        )))
+    } else {
+        Ok(())
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use std::any::Any;
@@ -5992,6 +6056,19 @@ mod tests {
         quick_test(sql, expected);
     }
 
+    #[test]
+    fn test_ambiguous_coulmn_referece_in_join() {
+        // The error message will be:
+        // reference 'id' is ambiguous, could be p1.id,p2.id;
+        let sql = "select p1.id, p1.age, p2.id 
+            from person as p1 
+            INNER JOIN person as p2 
+            ON id = 1";
+
+        // It should return error.
+        assert!(logical_plan(sql).is_err());
+    }
+
     fn assert_field_not_found(err: DataFusionError, name: &str) {
         match err {
             DataFusionError::SchemaError { .. } => {