You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/06/23 05:54:36 UTC

[GitHub] [arrow-datafusion] houqp opened a new pull request #605: fix join column handling logic for `On` and `Using` constraints

houqp opened a new pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605


   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   Follow up for https://github.com/apache/arrow-datafusion/pull/55#pullrequestreview-683118433.
   
   Closes #601.
   
   Also fixed a bug where `index_of_column` won't error out on duplicated fields.
   
    # Rationale for this change
   
   In MySQL and Postgres, Join with `On` constraints produces output with join column from both relations preserved. For example:
   
   ```sql
   SELECT * FROM test t1 JOIN test t2 ON t1.id = t2.id;
   ```
   produces:
   
   | id  | id  |
   | --- | --- |
   | 1   | 1   |
   | 2   | 2   |
   
   While join with `Using` constraints deduplicates the join column:
   
   ```sql
   SELECT * FROM test t1 JOIN test t2 USING (id);
   ```
   
   produces:
   
   | id  |
   | --- |
   | 1   |
   | 2   |
   
   However, in our current implementation, join column dedup is applied in all cases. This PR changes the behavior so it's consistent with MySQL and Postgres.
   
   Here comes the annoying part.
   
   Note that when it comes to join with `Using` constraint, users can still project join columns using relations from both sides. For example `SELECT t1.id, t2.id FROM test t1 JOIN test t2 USING (id)` produces the same output as `SELECT * FROM test t1 JOIN test t2 ON t1.id = t2.id`. This means for `Using` joins, we need to model a join column as a single shared column between both relations. Current `DFField` struct only allows a field/column to have a single qualifier, so I ended adding a new `shared_qualifiers` field to `DFField` struct to handle this edge-case. Our logical plan builder will be responsible for setting this field when building join queries with using constraints. During query optimization and planning, the `shared_qualifiers` field is used to look up column by name and qualifier.
   
   Other alternatives include changing the `qualifer` field of `DFField` to an option of enum to account for single qualifier and shared qualifiers. None of these approaches felt elegant to me. I am curious if anyone has ideas or suggestions on how to better implement this behavior.
   
   # What changes are included in this PR?
   
   * Expose JoinConstraints to physical plane
   * Expose JoinConstraints to ballista.proto
   * Unify JoinType enum between logical and physical planes
   * Added context execution tests to enforce semantics for `On` and `Using` joins
   * Refactored dfschema module to use `index_of_column_by_name` in `both index_of_column` and `field_with_qualified_name` methods.
   * Added `shared_qualifiers` filed to DFSchema struct
   
   # Are there any user-facing changes?
   
   Join with `On` constraints will now output joined columns from both relations without deduplication. Dedup is only applied to join with `Using` constraints.
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#discussion_r663445397



##########
File path: datafusion/src/logical_plan/dfschema.rs
##########
@@ -149,47 +147,80 @@ impl DFSchema {
         )))
     }
 
-    /// Find the index of the column with the given qualifer and name
-    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
-        for i in 0..self.fields.len() {
-            let field = &self.fields[i];
-            if field.qualifier() == col.relation.as_ref() && field.name() == &col.name {
-                return Ok(i);
-            }
+    fn index_of_column_by_name(

Review comment:
       Refactored this code into its own helper method to reduce duplicated code between `field_with_qualified_name` and `index_of_column`. This should also make `index_of_column` more robust.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#issuecomment-869141998


   Marking as draft so it is clearer from the list of PRs that this one is not quite ready to go (and thus I don't merge it accidentally)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#issuecomment-874026642


   Thanks @houqp  -- I'll try and review this later today but I may run out of time in which case I'll get it done tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#issuecomment-873534193


   @alamb reimplemented the logic based on your suggestion in https://github.com/apache/arrow-datafusion/pull/605#issuecomment-866862804.
   
   Turns out there are many more edge-cases that need to be handled for using join other than wildcard expansion:
   
   * predicate push down on join columns
   * normalize unqualified column expressions that reference join columns with qualifiers
   
   I have implemented support for all these edge-cases, but decided to leave out the wildcard expansion change as a follow up PR to keep the diff easier to review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#discussion_r656785924



##########
File path: datafusion/src/physical_plan/hash_utils.rs
##########
@@ -21,25 +21,9 @@ use crate::error::{DataFusionError, Result};
 use arrow::datatypes::{Field, Schema};
 use std::collections::HashSet;
 
+use crate::logical_plan::{JoinConstraint, JoinType};
 use crate::physical_plan::expressions::Column;
 
-/// All valid types of joins.
-#[derive(Clone, Copy, Debug, Eq, PartialEq)]
-pub enum JoinType {

Review comment:
       reuse the same enum from logical plane.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#issuecomment-877694442


   Apologize for being too busy last week and haven't had the time to update my PR, I am going to send a new PR to address all the feedbacks shortly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#issuecomment-866862804


   @houqp 
   
   The approach we seem to be taking is to try and keep the information about where a column came from in a join -- e.g. that a output DF field could be referred to as either `f1.foo` or `f2.foo` for example, which is getting complicated
   
   It seems like a core challenge is in the semantics of the `*` expansion in `select * from ...` type queries which varies depending on type of join. Have you considered perhaps focusing in that area rather than trying to track the optional qualifiers through the plans?
   
   So for example, in a `f1 JOIN f1` have the output schema contain both `f1.foo` and `f2.foo` but then change the expansion of `*` to have somethign like `f1.foo as foo`?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#discussion_r663445283



##########
File path: datafusion/src/logical_plan/builder.rs
##########
@@ -440,65 +424,19 @@ impl LogicalPlanBuilder {
 pub fn build_join_schema(
     left: &DFSchema,
     right: &DFSchema,
-    on: &[(Column, Column)],
     join_type: &JoinType,
-    join_constraint: &JoinConstraint,
 ) -> Result<DFSchema> {
     let fields: Vec<DFField> = match join_type {
-        JoinType::Inner | JoinType::Left | JoinType::Full => {
-            let duplicate_keys = match join_constraint {
-                JoinConstraint::On => on
-                    .iter()
-                    .filter(|(l, r)| l == r)
-                    .map(|on| on.1.clone())
-                    .collect::<HashSet<_>>(),
-                // using join requires unique join columns in the output schema, so we mark all
-                // right join keys as duplicate
-                JoinConstraint::Using => {
-                    on.iter().map(|on| on.1.clone()).collect::<HashSet<_>>()
-                }
-            };
-
+        JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => {

Review comment:
       Join schemas are now consistent across all join types and constraints. We implement the join column "merge" semantic externally in plan builder and optimizer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#discussion_r664709088



##########
File path: datafusion/src/logical_plan/expr.rs
##########
@@ -89,14 +89,46 @@ impl Column {
     ///
     /// For example, `foo` will be normalized to `t.foo` if there is a
     /// column named `foo` in a relation named `t` found in `schemas`
-    pub fn normalize(self, schemas: &[&DFSchemaRef]) -> Result<Self> {
+    pub fn normalize(self, plan: &LogicalPlan) -> Result<Self> {
         if self.relation.is_some() {
             return Ok(self);
         }
 
-        for schema in schemas {
-            if let Ok(field) = schema.field_with_unqualified_name(&self.name) {
-                return Ok(field.qualified_column());
+        let schemas = plan.all_schemas();
+        let using_columns = plan.using_columns()?;
+
+        for schema in &schemas {
+            let fields = schema.fields_with_unqualified_name(&self.name);
+            match fields.len() {
+                0 => continue,

Review comment:
       We are iterating through schemas from all plan nodes in the provided plan tree, each plan node could have different schemas, so when we do the `fields_with_unqualified_name` look up, some of these plan nodes will no contain a field that matches `self.name`. We just pick the first one that matches the unqualified name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#discussion_r663447375



##########
File path: datafusion/src/sql/planner.rs
##########
@@ -560,7 +560,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 //   SELECT c1 AS m FROM t HAVING c1 > 10;
                 //   SELECT c1, MAX(c2) AS m FROM t GROUP BY c1 HAVING MAX(c2) > 10;
                 //
-                resolve_aliases_to_exprs(&having_expr, &alias_map)
+                let having_expr = resolve_aliases_to_exprs(&having_expr, &alias_map)?;
+                normalize_col(having_expr, &projected_plan)

Review comment:
       needed if having expression referenced using join columns with unqualified column name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#discussion_r657121001



##########
File path: datafusion/src/execution/context.rs
##########
@@ -1259,6 +1259,96 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn left_join_using() -> Result<()> {

Review comment:
       👍  definitely an improvement
   

##########
File path: datafusion/src/logical_plan/dfschema.rs
##########
@@ -387,6 +402,9 @@ impl Display for DFSchema {
 pub struct DFField {
     /// Optional qualifier (usually a table or relation name)
     qualifier: Option<String>,
+    /// Optional set of qualifiers that all share this same field. This is used for `JOIN USING`

Review comment:
       it is confusing to me that the field would have multiple shared_qualifiers or a single qualifier or maybe both.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#discussion_r667387968



##########
File path: datafusion/src/logical_plan/dfschema.rs
##########
@@ -149,47 +147,80 @@ impl DFSchema {
         )))
     }
 
-    /// Find the index of the column with the given qualifer and name
-    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
-        for i in 0..self.fields.len() {
-            let field = &self.fields[i];
-            if field.qualifier() == col.relation.as_ref() && field.name() == &col.name {
-                return Ok(i);
-            }
+    fn index_of_column_by_name(
+        &self,
+        qualifier: Option<&str>,
+        name: &str,
+    ) -> Result<usize> {
+        let matches: Vec<usize> = self
+            .fields
+            .iter()
+            .enumerate()
+            .filter(|(_, field)| match (qualifier, &field.qualifier) {
+                // field to lookup is qualified.
+                // current field is qualified and not shared between relations, compare both
+                // qualifer and name.
+                (Some(q), Some(field_q)) => q == field_q && field.name() == name,
+                // field to lookup is qualified but current field is unqualified.
+                (Some(_), None) => false,
+                // field to lookup is unqualified, no need to compare qualifier
+                _ => field.name() == name,
+            })
+            .map(|(idx, _)| idx)
+            .collect();
+

Review comment:
       Good suggestion, fixed in https://github.com/apache/arrow-datafusion/pull/703.

##########
File path: datafusion/src/logical_plan/dfschema.rs
##########
@@ -149,47 +147,80 @@ impl DFSchema {
         )))
     }
 
-    /// Find the index of the column with the given qualifer and name
-    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
-        for i in 0..self.fields.len() {
-            let field = &self.fields[i];
-            if field.qualifier() == col.relation.as_ref() && field.name() == &col.name {
-                return Ok(i);
-            }
+    fn index_of_column_by_name(
+        &self,
+        qualifier: Option<&str>,
+        name: &str,
+    ) -> Result<usize> {
+        let matches: Vec<usize> = self
+            .fields
+            .iter()
+            .enumerate()
+            .filter(|(_, field)| match (qualifier, &field.qualifier) {
+                // field to lookup is qualified.
+                // current field is qualified and not shared between relations, compare both
+                // qualifer and name.
+                (Some(q), Some(field_q)) => q == field_q && field.name() == name,
+                // field to lookup is qualified but current field is unqualified.
+                (Some(_), None) => false,
+                // field to lookup is unqualified, no need to compare qualifier
+                _ => field.name() == name,
+            })
+            .map(|(idx, _)| idx)
+            .collect();
+

Review comment:
       Good suggestion, fixed in https://github.com/apache/arrow-datafusion/pull/703.

##########
File path: datafusion/src/logical_plan/plan.rs
##########
@@ -354,6 +356,43 @@ impl LogicalPlan {
             | LogicalPlan::CreateExternalTable { .. } => vec![],
         }
     }
+
+    /// returns all `Using` join columns in a logical plan
+    pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
+        struct UsingJoinColumnVisitor {
+            using_columns: Vec<HashSet<Column>>,
+        }
+
+        impl PlanVisitor for UsingJoinColumnVisitor {
+            type Error = DataFusionError;
+
+            fn pre_visit(&mut self, plan: &LogicalPlan) -> Result<bool, Self::Error> {
+                if let LogicalPlan::Join {
+                    join_constraint: JoinConstraint::Using,
+                    on,
+                    ..
+                } = plan
+                {
+                    self.using_columns.push(
+                        on.iter()
+                            .map(|entry| {
+                                std::iter::once(entry.0.clone())
+                                    .chain(std::iter::once(entry.1.clone()))

Review comment:
       @Jimexist sent https://github.com/apache/arrow-datafusion/pull/704 to address this. I found a comprise that's more readable and avoids the extra memory allocation.
   
   Here is my test to compare the code gen with different approaches: https://godbolt.org/z/j3dcjbnvM.

##########
File path: datafusion/src/logical_plan/dfschema.rs
##########
@@ -149,47 +147,80 @@ impl DFSchema {
         )))
     }
 
-    /// Find the index of the column with the given qualifer and name
-    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
-        for i in 0..self.fields.len() {
-            let field = &self.fields[i];
-            if field.qualifier() == col.relation.as_ref() && field.name() == &col.name {
-                return Ok(i);
-            }
+    fn index_of_column_by_name(
+        &self,
+        qualifier: Option<&str>,
+        name: &str,
+    ) -> Result<usize> {
+        let matches: Vec<usize> = self
+            .fields
+            .iter()
+            .enumerate()
+            .filter(|(_, field)| match (qualifier, &field.qualifier) {
+                // field to lookup is qualified.
+                // current field is qualified and not shared between relations, compare both
+                // qualifer and name.
+                (Some(q), Some(field_q)) => q == field_q && field.name() == name,
+                // field to lookup is qualified but current field is unqualified.
+                (Some(_), None) => false,
+                // field to lookup is unqualified, no need to compare qualifier
+                _ => field.name() == name,
+            })
+            .map(|(idx, _)| idx)
+            .collect();
+

Review comment:
       Good suggestion, fixed in https://github.com/apache/arrow-datafusion/pull/703.

##########
File path: datafusion/src/logical_plan/plan.rs
##########
@@ -354,6 +356,43 @@ impl LogicalPlan {
             | LogicalPlan::CreateExternalTable { .. } => vec![],
         }
     }
+
+    /// returns all `Using` join columns in a logical plan
+    pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
+        struct UsingJoinColumnVisitor {
+            using_columns: Vec<HashSet<Column>>,
+        }
+
+        impl PlanVisitor for UsingJoinColumnVisitor {
+            type Error = DataFusionError;
+
+            fn pre_visit(&mut self, plan: &LogicalPlan) -> Result<bool, Self::Error> {
+                if let LogicalPlan::Join {
+                    join_constraint: JoinConstraint::Using,
+                    on,
+                    ..
+                } = plan
+                {
+                    self.using_columns.push(
+                        on.iter()
+                            .map(|entry| {
+                                std::iter::once(entry.0.clone())
+                                    .chain(std::iter::once(entry.1.clone()))

Review comment:
       @Jimexist sent https://github.com/apache/arrow-datafusion/pull/704 to address this. I found a comprise that's more readable and avoids the extra memory allocation.
   
   Here is my test to compare the code gen with different approaches: https://godbolt.org/z/j3dcjbnvM.

##########
File path: datafusion/src/logical_plan/dfschema.rs
##########
@@ -149,47 +147,80 @@ impl DFSchema {
         )))
     }
 
-    /// Find the index of the column with the given qualifer and name
-    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
-        for i in 0..self.fields.len() {
-            let field = &self.fields[i];
-            if field.qualifier() == col.relation.as_ref() && field.name() == &col.name {
-                return Ok(i);
-            }
+    fn index_of_column_by_name(
+        &self,
+        qualifier: Option<&str>,
+        name: &str,
+    ) -> Result<usize> {
+        let matches: Vec<usize> = self
+            .fields
+            .iter()
+            .enumerate()
+            .filter(|(_, field)| match (qualifier, &field.qualifier) {
+                // field to lookup is qualified.
+                // current field is qualified and not shared between relations, compare both
+                // qualifer and name.
+                (Some(q), Some(field_q)) => q == field_q && field.name() == name,
+                // field to lookup is qualified but current field is unqualified.
+                (Some(_), None) => false,
+                // field to lookup is unqualified, no need to compare qualifier
+                _ => field.name() == name,
+            })
+            .map(|(idx, _)| idx)
+            .collect();
+

Review comment:
       Good suggestion, fixed in https://github.com/apache/arrow-datafusion/pull/703.

##########
File path: datafusion/src/logical_plan/plan.rs
##########
@@ -354,6 +356,43 @@ impl LogicalPlan {
             | LogicalPlan::CreateExternalTable { .. } => vec![],
         }
     }
+
+    /// returns all `Using` join columns in a logical plan
+    pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
+        struct UsingJoinColumnVisitor {
+            using_columns: Vec<HashSet<Column>>,
+        }
+
+        impl PlanVisitor for UsingJoinColumnVisitor {
+            type Error = DataFusionError;
+
+            fn pre_visit(&mut self, plan: &LogicalPlan) -> Result<bool, Self::Error> {
+                if let LogicalPlan::Join {
+                    join_constraint: JoinConstraint::Using,
+                    on,
+                    ..
+                } = plan
+                {
+                    self.using_columns.push(
+                        on.iter()
+                            .map(|entry| {
+                                std::iter::once(entry.0.clone())
+                                    .chain(std::iter::once(entry.1.clone()))

Review comment:
       @Jimexist sent https://github.com/apache/arrow-datafusion/pull/704 to address this. I found a comprise that's more readable and avoids the extra memory allocation.
   
   Here is my test to compare the code gen with different approaches: https://godbolt.org/z/j3dcjbnvM.

##########
File path: datafusion/src/logical_plan/dfschema.rs
##########
@@ -149,47 +147,80 @@ impl DFSchema {
         )))
     }
 
-    /// Find the index of the column with the given qualifer and name
-    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
-        for i in 0..self.fields.len() {
-            let field = &self.fields[i];
-            if field.qualifier() == col.relation.as_ref() && field.name() == &col.name {
-                return Ok(i);
-            }
+    fn index_of_column_by_name(
+        &self,
+        qualifier: Option<&str>,
+        name: &str,
+    ) -> Result<usize> {
+        let matches: Vec<usize> = self
+            .fields
+            .iter()
+            .enumerate()
+            .filter(|(_, field)| match (qualifier, &field.qualifier) {
+                // field to lookup is qualified.
+                // current field is qualified and not shared between relations, compare both
+                // qualifer and name.
+                (Some(q), Some(field_q)) => q == field_q && field.name() == name,
+                // field to lookup is qualified but current field is unqualified.
+                (Some(_), None) => false,
+                // field to lookup is unqualified, no need to compare qualifier
+                _ => field.name() == name,
+            })
+            .map(|(idx, _)| idx)
+            .collect();
+

Review comment:
       Good suggestion, fixed in https://github.com/apache/arrow-datafusion/pull/703.

##########
File path: datafusion/src/logical_plan/plan.rs
##########
@@ -354,6 +356,43 @@ impl LogicalPlan {
             | LogicalPlan::CreateExternalTable { .. } => vec![],
         }
     }
+
+    /// returns all `Using` join columns in a logical plan
+    pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
+        struct UsingJoinColumnVisitor {
+            using_columns: Vec<HashSet<Column>>,
+        }
+
+        impl PlanVisitor for UsingJoinColumnVisitor {
+            type Error = DataFusionError;
+
+            fn pre_visit(&mut self, plan: &LogicalPlan) -> Result<bool, Self::Error> {
+                if let LogicalPlan::Join {
+                    join_constraint: JoinConstraint::Using,
+                    on,
+                    ..
+                } = plan
+                {
+                    self.using_columns.push(
+                        on.iter()
+                            .map(|entry| {
+                                std::iter::once(entry.0.clone())
+                                    .chain(std::iter::once(entry.1.clone()))

Review comment:
       @Jimexist sent https://github.com/apache/arrow-datafusion/pull/704 to address this. I found a comprise that's more readable and avoids the extra memory allocation.
   
   Here is my test to compare the code gen with different approaches: https://godbolt.org/z/j3dcjbnvM.

##########
File path: datafusion/src/logical_plan/dfschema.rs
##########
@@ -149,47 +147,80 @@ impl DFSchema {
         )))
     }
 
-    /// Find the index of the column with the given qualifer and name
-    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
-        for i in 0..self.fields.len() {
-            let field = &self.fields[i];
-            if field.qualifier() == col.relation.as_ref() && field.name() == &col.name {
-                return Ok(i);
-            }
+    fn index_of_column_by_name(
+        &self,
+        qualifier: Option<&str>,
+        name: &str,
+    ) -> Result<usize> {
+        let matches: Vec<usize> = self
+            .fields
+            .iter()
+            .enumerate()
+            .filter(|(_, field)| match (qualifier, &field.qualifier) {
+                // field to lookup is qualified.
+                // current field is qualified and not shared between relations, compare both
+                // qualifer and name.
+                (Some(q), Some(field_q)) => q == field_q && field.name() == name,
+                // field to lookup is qualified but current field is unqualified.
+                (Some(_), None) => false,
+                // field to lookup is unqualified, no need to compare qualifier
+                _ => field.name() == name,
+            })
+            .map(|(idx, _)| idx)
+            .collect();
+

Review comment:
       Good suggestion, fixed in https://github.com/apache/arrow-datafusion/pull/703.

##########
File path: datafusion/src/logical_plan/plan.rs
##########
@@ -354,6 +356,43 @@ impl LogicalPlan {
             | LogicalPlan::CreateExternalTable { .. } => vec![],
         }
     }
+
+    /// returns all `Using` join columns in a logical plan
+    pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
+        struct UsingJoinColumnVisitor {
+            using_columns: Vec<HashSet<Column>>,
+        }
+
+        impl PlanVisitor for UsingJoinColumnVisitor {
+            type Error = DataFusionError;
+
+            fn pre_visit(&mut self, plan: &LogicalPlan) -> Result<bool, Self::Error> {
+                if let LogicalPlan::Join {
+                    join_constraint: JoinConstraint::Using,
+                    on,
+                    ..
+                } = plan
+                {
+                    self.using_columns.push(
+                        on.iter()
+                            .map(|entry| {
+                                std::iter::once(entry.0.clone())
+                                    .chain(std::iter::once(entry.1.clone()))

Review comment:
       @Jimexist sent https://github.com/apache/arrow-datafusion/pull/704 to address this. I found a comprise that's more readable and avoids the extra memory allocation.
   
   Here is my test to compare the code gen with different approaches: https://godbolt.org/z/j3dcjbnvM.

##########
File path: datafusion/src/logical_plan/dfschema.rs
##########
@@ -149,47 +147,80 @@ impl DFSchema {
         )))
     }
 
-    /// Find the index of the column with the given qualifer and name
-    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
-        for i in 0..self.fields.len() {
-            let field = &self.fields[i];
-            if field.qualifier() == col.relation.as_ref() && field.name() == &col.name {
-                return Ok(i);
-            }
+    fn index_of_column_by_name(
+        &self,
+        qualifier: Option<&str>,
+        name: &str,
+    ) -> Result<usize> {
+        let matches: Vec<usize> = self
+            .fields
+            .iter()
+            .enumerate()
+            .filter(|(_, field)| match (qualifier, &field.qualifier) {
+                // field to lookup is qualified.
+                // current field is qualified and not shared between relations, compare both
+                // qualifer and name.
+                (Some(q), Some(field_q)) => q == field_q && field.name() == name,
+                // field to lookup is qualified but current field is unqualified.
+                (Some(_), None) => false,
+                // field to lookup is unqualified, no need to compare qualifier
+                _ => field.name() == name,
+            })
+            .map(|(idx, _)| idx)
+            .collect();
+

Review comment:
       Good suggestion, fixed in https://github.com/apache/arrow-datafusion/pull/703.

##########
File path: datafusion/src/logical_plan/plan.rs
##########
@@ -354,6 +356,43 @@ impl LogicalPlan {
             | LogicalPlan::CreateExternalTable { .. } => vec![],
         }
     }
+
+    /// returns all `Using` join columns in a logical plan
+    pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
+        struct UsingJoinColumnVisitor {
+            using_columns: Vec<HashSet<Column>>,
+        }
+
+        impl PlanVisitor for UsingJoinColumnVisitor {
+            type Error = DataFusionError;
+
+            fn pre_visit(&mut self, plan: &LogicalPlan) -> Result<bool, Self::Error> {
+                if let LogicalPlan::Join {
+                    join_constraint: JoinConstraint::Using,
+                    on,
+                    ..
+                } = plan
+                {
+                    self.using_columns.push(
+                        on.iter()
+                            .map(|entry| {
+                                std::iter::once(entry.0.clone())
+                                    .chain(std::iter::once(entry.1.clone()))

Review comment:
       @Jimexist sent https://github.com/apache/arrow-datafusion/pull/704 to address this. I found a comprise that's more readable and avoids the extra memory allocation.
   
   Here is my test to compare the code gen with different approaches: https://godbolt.org/z/j3dcjbnvM.

##########
File path: datafusion/src/logical_plan/dfschema.rs
##########
@@ -149,47 +147,80 @@ impl DFSchema {
         )))
     }
 
-    /// Find the index of the column with the given qualifer and name
-    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
-        for i in 0..self.fields.len() {
-            let field = &self.fields[i];
-            if field.qualifier() == col.relation.as_ref() && field.name() == &col.name {
-                return Ok(i);
-            }
+    fn index_of_column_by_name(
+        &self,
+        qualifier: Option<&str>,
+        name: &str,
+    ) -> Result<usize> {
+        let matches: Vec<usize> = self
+            .fields
+            .iter()
+            .enumerate()
+            .filter(|(_, field)| match (qualifier, &field.qualifier) {
+                // field to lookup is qualified.
+                // current field is qualified and not shared between relations, compare both
+                // qualifer and name.
+                (Some(q), Some(field_q)) => q == field_q && field.name() == name,
+                // field to lookup is qualified but current field is unqualified.
+                (Some(_), None) => false,
+                // field to lookup is unqualified, no need to compare qualifier
+                _ => field.name() == name,
+            })
+            .map(|(idx, _)| idx)
+            .collect();
+

Review comment:
       Good suggestion, fixed in https://github.com/apache/arrow-datafusion/pull/703.

##########
File path: datafusion/src/logical_plan/plan.rs
##########
@@ -354,6 +356,43 @@ impl LogicalPlan {
             | LogicalPlan::CreateExternalTable { .. } => vec![],
         }
     }
+
+    /// returns all `Using` join columns in a logical plan
+    pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
+        struct UsingJoinColumnVisitor {
+            using_columns: Vec<HashSet<Column>>,
+        }
+
+        impl PlanVisitor for UsingJoinColumnVisitor {
+            type Error = DataFusionError;
+
+            fn pre_visit(&mut self, plan: &LogicalPlan) -> Result<bool, Self::Error> {
+                if let LogicalPlan::Join {
+                    join_constraint: JoinConstraint::Using,
+                    on,
+                    ..
+                } = plan
+                {
+                    self.using_columns.push(
+                        on.iter()
+                            .map(|entry| {
+                                std::iter::once(entry.0.clone())
+                                    .chain(std::iter::once(entry.1.clone()))

Review comment:
       @Jimexist sent https://github.com/apache/arrow-datafusion/pull/704 to address this. I found a comprise that's more readable and avoids the extra memory allocation.
   
   Here is my test to compare the code gen with different approaches: https://godbolt.org/z/j3dcjbnvM.

##########
File path: datafusion/src/logical_plan/dfschema.rs
##########
@@ -149,47 +147,80 @@ impl DFSchema {
         )))
     }
 
-    /// Find the index of the column with the given qualifer and name
-    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
-        for i in 0..self.fields.len() {
-            let field = &self.fields[i];
-            if field.qualifier() == col.relation.as_ref() && field.name() == &col.name {
-                return Ok(i);
-            }
+    fn index_of_column_by_name(
+        &self,
+        qualifier: Option<&str>,
+        name: &str,
+    ) -> Result<usize> {
+        let matches: Vec<usize> = self
+            .fields
+            .iter()
+            .enumerate()
+            .filter(|(_, field)| match (qualifier, &field.qualifier) {
+                // field to lookup is qualified.
+                // current field is qualified and not shared between relations, compare both
+                // qualifer and name.
+                (Some(q), Some(field_q)) => q == field_q && field.name() == name,
+                // field to lookup is qualified but current field is unqualified.
+                (Some(_), None) => false,
+                // field to lookup is unqualified, no need to compare qualifier
+                _ => field.name() == name,
+            })
+            .map(|(idx, _)| idx)
+            .collect();
+

Review comment:
       Good suggestion, fixed in https://github.com/apache/arrow-datafusion/pull/703.

##########
File path: datafusion/src/logical_plan/dfschema.rs
##########
@@ -149,47 +147,80 @@ impl DFSchema {
         )))
     }
 
-    /// Find the index of the column with the given qualifer and name
-    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
-        for i in 0..self.fields.len() {
-            let field = &self.fields[i];
-            if field.qualifier() == col.relation.as_ref() && field.name() == &col.name {
-                return Ok(i);
-            }
+    fn index_of_column_by_name(
+        &self,
+        qualifier: Option<&str>,
+        name: &str,
+    ) -> Result<usize> {
+        let matches: Vec<usize> = self
+            .fields
+            .iter()
+            .enumerate()
+            .filter(|(_, field)| match (qualifier, &field.qualifier) {
+                // field to lookup is qualified.
+                // current field is qualified and not shared between relations, compare both
+                // qualifer and name.
+                (Some(q), Some(field_q)) => q == field_q && field.name() == name,
+                // field to lookup is qualified but current field is unqualified.
+                (Some(_), None) => false,
+                // field to lookup is unqualified, no need to compare qualifier
+                _ => field.name() == name,
+            })
+            .map(|(idx, _)| idx)
+            .collect();
+

Review comment:
       Good suggestion, fixed in https://github.com/apache/arrow-datafusion/pull/703.

##########
File path: datafusion/src/logical_plan/plan.rs
##########
@@ -354,6 +356,43 @@ impl LogicalPlan {
             | LogicalPlan::CreateExternalTable { .. } => vec![],
         }
     }
+
+    /// returns all `Using` join columns in a logical plan
+    pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
+        struct UsingJoinColumnVisitor {
+            using_columns: Vec<HashSet<Column>>,
+        }
+
+        impl PlanVisitor for UsingJoinColumnVisitor {
+            type Error = DataFusionError;
+
+            fn pre_visit(&mut self, plan: &LogicalPlan) -> Result<bool, Self::Error> {
+                if let LogicalPlan::Join {
+                    join_constraint: JoinConstraint::Using,
+                    on,
+                    ..
+                } = plan
+                {
+                    self.using_columns.push(
+                        on.iter()
+                            .map(|entry| {
+                                std::iter::once(entry.0.clone())
+                                    .chain(std::iter::once(entry.1.clone()))

Review comment:
       @Jimexist sent https://github.com/apache/arrow-datafusion/pull/704 to address this. I found a comprise that's more readable and avoids the extra memory allocation.
   
   Here is my test to compare the code gen with different approaches: https://godbolt.org/z/j3dcjbnvM.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp edited a comment on pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
houqp edited a comment on pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#issuecomment-867776830


   let's wait for my alternative solutions and review them together before the merge unless there is urgent issue in master that this PR addresses. I would like to avoid merging in a premature abstraction to reduce noise :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#discussion_r657376400



##########
File path: datafusion/src/logical_plan/dfschema.rs
##########
@@ -387,6 +402,9 @@ impl Display for DFSchema {
 pub struct DFField {
     /// Optional qualifier (usually a table or relation name)
     qualifier: Option<String>,
+    /// Optional set of qualifiers that all share this same field. This is used for `JOIN USING`

Review comment:
       Yeah, this is the part I am not a fan about the current implementation, a field having both shared_qualifiers and single qualifier should never happen. An optional enum type on qualifier field would provide the correct semantic we want to capture here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#discussion_r667379802



##########
File path: datafusion/src/logical_plan/plan.rs
##########
@@ -354,6 +356,43 @@ impl LogicalPlan {
             | LogicalPlan::CreateExternalTable { .. } => vec![],
         }
     }
+
+    /// returns all `Using` join columns in a logical plan
+    pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {

Review comment:
       @alamb this is used outside of the optimizers as well, for example, the logical plan builder. With that context, do you still think it should live in optimizer utils module?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Jimexist commented on a change in pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
Jimexist commented on a change in pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#discussion_r664542089



##########
File path: datafusion/src/logical_plan/expr.rs
##########
@@ -1118,36 +1133,56 @@ pub fn columnize_expr(e: Expr, input_schema: &DFSchema) -> Expr {
     }
 }
 
+/// Recursively replace all Column expressions in a given expression tree with Column expressions
+/// provided by the hash map argument.
+pub fn replace_col(e: Expr, replace_map: &HashMap<&Column, &Column>) -> Result<Expr> {

Review comment:
       use `pub(crate)`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#discussion_r667387968



##########
File path: datafusion/src/logical_plan/dfschema.rs
##########
@@ -149,47 +147,80 @@ impl DFSchema {
         )))
     }
 
-    /// Find the index of the column with the given qualifer and name
-    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
-        for i in 0..self.fields.len() {
-            let field = &self.fields[i];
-            if field.qualifier() == col.relation.as_ref() && field.name() == &col.name {
-                return Ok(i);
-            }
+    fn index_of_column_by_name(
+        &self,
+        qualifier: Option<&str>,
+        name: &str,
+    ) -> Result<usize> {
+        let matches: Vec<usize> = self
+            .fields
+            .iter()
+            .enumerate()
+            .filter(|(_, field)| match (qualifier, &field.qualifier) {
+                // field to lookup is qualified.
+                // current field is qualified and not shared between relations, compare both
+                // qualifer and name.
+                (Some(q), Some(field_q)) => q == field_q && field.name() == name,
+                // field to lookup is qualified but current field is unqualified.
+                (Some(_), None) => false,
+                // field to lookup is unqualified, no need to compare qualifier
+                _ => field.name() == name,
+            })
+            .map(|(idx, _)| idx)
+            .collect();
+

Review comment:
       Good suggestion, fixed in https://github.com/apache/arrow-datafusion/pull/703.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Jimexist commented on a change in pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
Jimexist commented on a change in pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#discussion_r664540946



##########
File path: datafusion/src/logical_plan/expr.rs
##########
@@ -89,14 +89,46 @@ impl Column {
     ///
     /// For example, `foo` will be normalized to `t.foo` if there is a
     /// column named `foo` in a relation named `t` found in `schemas`
-    pub fn normalize(self, schemas: &[&DFSchemaRef]) -> Result<Self> {
+    pub fn normalize(self, plan: &LogicalPlan) -> Result<Self> {
         if self.relation.is_some() {
             return Ok(self);
         }
 
-        for schema in schemas {
-            if let Ok(field) = schema.field_with_unqualified_name(&self.name) {
-                return Ok(field.qualified_column());
+        let schemas = plan.all_schemas();
+        let using_columns = plan.using_columns()?;
+
+        for schema in &schemas {
+            let fields = schema.fields_with_unqualified_name(&self.name);
+            match fields.len() {
+                0 => continue,

Review comment:
       how would this be possible?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#discussion_r657382798



##########
File path: datafusion/src/logical_plan/dfschema.rs
##########
@@ -387,6 +402,9 @@ impl Display for DFSchema {
 pub struct DFField {
     /// Optional qualifier (usually a table or relation name)
     qualifier: Option<String>,
+    /// Optional set of qualifiers that all share this same field. This is used for `JOIN USING`

Review comment:
       I am going to try the enum approach as an alternative implementation, then we can take a look at it to see which one looks better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Jimexist commented on a change in pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
Jimexist commented on a change in pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#discussion_r664543555



##########
File path: datafusion/src/optimizer/filter_push_down.rs
##########
@@ -96,12 +96,21 @@ fn get_join_predicates<'a>(
     let left_columns = &left
         .fields()
         .iter()
-        .map(|f| f.qualified_column())
+        .map(|f| {
+            std::iter::once(f.qualified_column())
+                // we need to push down filter using unqualified column as well
+                .chain(std::iter::once(f.unqualified_column()))

Review comment:
       same as https://github.com/apache/arrow-datafusion/pull/605/files#r664543206

##########
File path: datafusion/src/optimizer/filter_push_down.rs
##########
@@ -96,12 +96,21 @@ fn get_join_predicates<'a>(
     let left_columns = &left
         .fields()
         .iter()
-        .map(|f| f.qualified_column())
+        .map(|f| {
+            std::iter::once(f.qualified_column())
+                // we need to push down filter using unqualified column as well
+                .chain(std::iter::once(f.unqualified_column()))
+        })
+        .flatten()
         .collect::<HashSet<_>>();
     let right_columns = &right
         .fields()
         .iter()
-        .map(|f| f.qualified_column())
+        .map(|f| {
+            std::iter::once(f.qualified_column())

Review comment:
       same as https://github.com/apache/arrow-datafusion/pull/605/files#r664543206




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#issuecomment-867081932


   Good catch on the regressions, I will get them fixed tonight with unit tests.
   
   > It seems like a core challenge is in the semantics of the * expansion in select * from ... type queries which varies depending on type of join.
   
   Yeah, that's exactly the problem.
   
   Initially, I tried the approach of keeping the join columns from both relations (i.e. `f1.foo` and `f2.foo`) in the logical schema. Then I ran into the problem of it breaking the [physical planning invariants](https://github.com/apache/arrow-datafusion/blob/master/docs/specification/invariants.md#the-physical-schema-is-invariant-under-planning). Because for `Using` join, we are only producing a single join column in the output, the physical plan schema can only has a single field for the join column. The schema from the logical plan will need to match that as well in order to honor that invariant.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#discussion_r663445635



##########
File path: datafusion/src/logical_plan/expr.rs
##########
@@ -1118,36 +1133,56 @@ pub fn columnize_expr(e: Expr, input_schema: &DFSchema) -> Expr {
     }
 }
 
+/// Recursively replace all Column expressions in a given expression tree with Column expressions
+/// provided by the hash map argument.
+pub fn replace_col(e: Expr, replace_map: &HashMap<&Column, &Column>) -> Result<Expr> {
+    struct ColumnReplacer<'a> {
+        replace_map: &'a HashMap<&'a Column, &'a Column>,
+    }
+
+    impl<'a> ExprRewriter for ColumnReplacer<'a> {
+        fn mutate(&mut self, expr: Expr) -> Result<Expr> {
+            if let Expr::Column(c) = &expr {
+                match self.replace_map.get(c) {
+                    Some(new_c) => Ok(Expr::Column((*new_c).to_owned())),
+                    None => Ok(expr),
+                }
+            } else {
+                Ok(expr)
+            }
+        }
+    }
+
+    e.rewrite(&mut ColumnReplacer { replace_map })
+}
+
 /// Recursively call [`Column::normalize`] on all Column expressions
 /// in the `expr` expression tree.
-pub fn normalize_col(e: Expr, schemas: &[&DFSchemaRef]) -> Result<Expr> {
-    struct ColumnNormalizer<'a, 'b> {
-        schemas: &'a [&'b DFSchemaRef],
+pub fn normalize_col(e: Expr, plan: &LogicalPlan) -> Result<Expr> {

Review comment:
       Change from schema to logical plan so we can extract join columns for join clauses with using constraints.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#discussion_r667388265



##########
File path: datafusion/src/logical_plan/plan.rs
##########
@@ -354,6 +356,43 @@ impl LogicalPlan {
             | LogicalPlan::CreateExternalTable { .. } => vec![],
         }
     }
+
+    /// returns all `Using` join columns in a logical plan
+    pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
+        struct UsingJoinColumnVisitor {
+            using_columns: Vec<HashSet<Column>>,
+        }
+
+        impl PlanVisitor for UsingJoinColumnVisitor {
+            type Error = DataFusionError;
+
+            fn pre_visit(&mut self, plan: &LogicalPlan) -> Result<bool, Self::Error> {
+                if let LogicalPlan::Join {
+                    join_constraint: JoinConstraint::Using,
+                    on,
+                    ..
+                } = plan
+                {
+                    self.using_columns.push(
+                        on.iter()
+                            .map(|entry| {
+                                std::iter::once(entry.0.clone())
+                                    .chain(std::iter::once(entry.1.clone()))

Review comment:
       @Jimexist sent https://github.com/apache/arrow-datafusion/pull/704 to address this. I found a comprise that's more readable and avoids the extra memory allocation.
   
   Here is my test to compare the code gen with different approaches: https://godbolt.org/z/j3dcjbnvM.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#issuecomment-867168227


   > Because for Using join, we are only producing a single join column in the output
   
   is this something we can change (aka update the using join to produce columns and then rely on projection pushdown to prune the uneeded ones out?)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#issuecomment-867695727


   > Perhaps you rain those tests with a different build?
   
   I am not sure what I tested with, but I re-ran the tests at b8da356a4a07a2733f5b260f763198a114f60fc7 and everything is working -- sorry for the noise. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Jimexist commented on a change in pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
Jimexist commented on a change in pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#discussion_r664538972



##########
File path: datafusion/src/logical_plan/dfschema.rs
##########
@@ -149,47 +147,80 @@ impl DFSchema {
         )))
     }
 
-    /// Find the index of the column with the given qualifer and name
-    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
-        for i in 0..self.fields.len() {
-            let field = &self.fields[i];
-            if field.qualifier() == col.relation.as_ref() && field.name() == &col.name {
-                return Ok(i);
-            }
+    fn index_of_column_by_name(
+        &self,
+        qualifier: Option<&str>,
+        name: &str,
+    ) -> Result<usize> {
+        let matches: Vec<usize> = self
+            .fields
+            .iter()
+            .enumerate()
+            .filter(|(_, field)| match (qualifier, &field.qualifier) {
+                // field to lookup is qualified.
+                // current field is qualified and not shared between relations, compare both
+                // qualifer and name.
+                (Some(q), Some(field_q)) => q == field_q && field.name() == name,
+                // field to lookup is qualified but current field is unqualified.
+                (Some(_), None) => false,
+                // field to lookup is unqualified, no need to compare qualifier
+                _ => field.name() == name,

Review comment:
       prefer to write out cases, `(None, None)` and `(None, Some(_)` and union them. this makes it clearer?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#discussion_r657469588



##########
File path: datafusion/src/logical_plan/dfschema.rs
##########
@@ -387,6 +402,9 @@ impl Display for DFSchema {
 pub struct DFField {
     /// Optional qualifier (usually a table or relation name)
     qualifier: Option<String>,
+    /// Optional set of qualifiers that all share this same field. This is used for `JOIN USING`

Review comment:
       sounds good




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#discussion_r663445568



##########
File path: datafusion/src/logical_plan/expr.rs
##########
@@ -1118,36 +1133,56 @@ pub fn columnize_expr(e: Expr, input_schema: &DFSchema) -> Expr {
     }
 }
 
+/// Recursively replace all Column expressions in a given expression tree with Column expressions
+/// provided by the hash map argument.
+pub fn replace_col(e: Expr, replace_map: &HashMap<&Column, &Column>) -> Result<Expr> {

Review comment:
       this is used in predicate push down optimizer to push predicates to both sides of the join clause.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp edited a comment on pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
houqp edited a comment on pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#issuecomment-873534193


   @alamb reimplemented the logic based on your suggestion in https://github.com/apache/arrow-datafusion/pull/605#issuecomment-866862804.
   
   Turns out there are many more edge-cases that need to be handled for using join other than wildcard expansion:
   
   * predicate push down on join columns
   * normalize unqualified column expressions that reference join columns with qualifiers
   
   I have implemented support for all these edge-cases, but decided to leave out the wildcard expansion change as a follow up PR to keep the diff easier to review.
   
   UPDATE: filed https://github.com/apache/arrow-datafusion/pull/678 as follow up.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#issuecomment-867199051


   > is this something we can change (aka update the using join to produce columns and then rely on projection pushdown to prune the uneeded ones out?)
   
   That's a good point, the single column output semantic doesn't need to be enforced at the join node level. let me give this a try as well :+1:  This could simplify our join handling logic. I will also double check to see if this would result in nontrivial runtime overhead for us.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#discussion_r663444912



##########
File path: ballista/rust/core/src/serde/logical_plan/from_proto.rs
##########
@@ -257,23 +257,32 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
                             join.join_type
                         ))
                     })?;
-                let join_type = match join_type {
-                    protobuf::JoinType::Inner => JoinType::Inner,
-                    protobuf::JoinType::Left => JoinType::Left,
-                    protobuf::JoinType::Right => JoinType::Right,
-                    protobuf::JoinType::Full => JoinType::Full,
-                    protobuf::JoinType::Semi => JoinType::Semi,
-                    protobuf::JoinType::Anti => JoinType::Anti,
-                };

Review comment:
       conversion handled by shared method in `serde/mod.rs`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#issuecomment-867342777


   @alamb, I wasn't able to reproduce the errors you showed in https://github.com/apache/arrow-datafusion/pull/605#pullrequestreview-690682368
   
   Here is what I got:
   
   ```
   > CREATE EXTERNAL TABLE foo(bar int)
   STORED AS CSV
   LOCATION '/tmp/foo.csv';
   0 rows in set. Query took 0.001 seconds.
   > select f1.bar, f2.bar from foo as f1 JOIN foo as f2 ON f1.bar = f2.bar;
   +-----+-----+
   | bar | bar |
   +-----+-----+
   | 3   | 3   |
   | 1   | 1   |
   | 4   | 4   |
   | 2   | 2   |
   +-----+-----+
   4 rows in set. Query took 0.022 seconds.
   >  select * from foo as f1 JOIN foo as f2 ON f1.bar = f2.bar;
   +-----+-----+
   | bar | bar |
   +-----+-----+
   | 3   | 3   |
   | 1   | 1   |
   | 4   | 4   |
   | 2   | 2   |
   +-----+-----+
   4 rows in set. Query took 0.022 seconds.
   > select f1.bar, f2.bar from foo as f1 JOIN foo as f2 USING(bar);
   +-----+-----+
   | bar | bar |
   +-----+-----+
   | 2   | 2   |
   | 3   | 3   |
   | 1   | 1   |
   | 4   | 4   |
   +-----+-----+
   4 rows in set. Query took 0.020 seconds.
   > 
   ```
   
   Perhaps you rain those tests with a different build?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb merged pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
alamb merged pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#issuecomment-874026642


   Thanks @houqp  -- I'll try and review this later today but I may run out of time in which case I'll get it done tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#issuecomment-875554378


   Aaand it looks like I forgot to fetch `apache/master` on my machine prior to testing -- fixed in https://github.com/apache/arrow-datafusion/pull/694


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#issuecomment-867776830


   let's wait for my alternative solutions and review them together before the merge unless this is urgent issue in master that this PR addresses. I would like to avoid merging in a premature abstraction to reduce noise :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Jimexist commented on a change in pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
Jimexist commented on a change in pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#discussion_r664543206



##########
File path: datafusion/src/logical_plan/plan.rs
##########
@@ -354,6 +356,43 @@ impl LogicalPlan {
             | LogicalPlan::CreateExternalTable { .. } => vec![],
         }
     }
+
+    /// returns all `Using` join columns in a logical plan
+    pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
+        struct UsingJoinColumnVisitor {
+            using_columns: Vec<HashSet<Column>>,
+        }
+
+        impl PlanVisitor for UsingJoinColumnVisitor {
+            type Error = DataFusionError;
+
+            fn pre_visit(&mut self, plan: &LogicalPlan) -> Result<bool, Self::Error> {
+                if let LogicalPlan::Join {
+                    join_constraint: JoinConstraint::Using,
+                    on,
+                    ..
+                } = plan
+                {
+                    self.using_columns.push(
+                        on.iter()
+                            .map(|entry| {
+                                std::iter::once(entry.0.clone())
+                                    .chain(std::iter::once(entry.1.clone()))

Review comment:
       why not just flatmap with a `vec![entry.0.clone(), entry.1.clone()]` - it's cleaner




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#issuecomment-875546365


   I am going to merge this PR in (as it conflicts with https://github.com/apache/arrow-datafusion/pull/689) -- I think we can make additional improvements as follow on PRs. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#discussion_r664735397



##########
File path: datafusion/src/logical_plan/dfschema.rs
##########
@@ -149,47 +147,80 @@ impl DFSchema {
         )))
     }
 
-    /// Find the index of the column with the given qualifer and name
-    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
-        for i in 0..self.fields.len() {
-            let field = &self.fields[i];
-            if field.qualifier() == col.relation.as_ref() && field.name() == &col.name {
-                return Ok(i);
-            }
+    fn index_of_column_by_name(
+        &self,
+        qualifier: Option<&str>,
+        name: &str,
+    ) -> Result<usize> {
+        let matches: Vec<usize> = self
+            .fields
+            .iter()
+            .enumerate()
+            .filter(|(_, field)| match (qualifier, &field.qualifier) {
+                // field to lookup is qualified.
+                // current field is qualified and not shared between relations, compare both
+                // qualifer and name.
+                (Some(q), Some(field_q)) => q == field_q && field.name() == name,
+                // field to lookup is qualified but current field is unqualified.
+                (Some(_), None) => false,
+                // field to lookup is unqualified, no need to compare qualifier
+                _ => field.name() == name,
+            })
+            .map(|(idx, _)| idx)
+            .collect();
+

Review comment:
       it probably doesn't matter but you could avoid the Vec allocation by something like:
   
   ```rust
           let matches = self....; 
           match matches.next() {
           let name = matches.next() {
             None => // error about no field
             Some(name) { 
               if matches.next().is_some() => // error about ambiguous reference
               else name
             }
   ```

##########
File path: datafusion/src/optimizer/filter_push_down.rs
##########
@@ -901,20 +979,61 @@ mod tests {
             format!("{:?}", plan),
             "\
             Filter: #test.a LtEq Int64(1)\
-            \n  Join: #test.a = #test.a\
+            \n  Join: #test.a = #test2.a\

Review comment:
       👍 

##########
File path: datafusion/src/logical_plan/plan.rs
##########
@@ -354,6 +356,43 @@ impl LogicalPlan {
             | LogicalPlan::CreateExternalTable { .. } => vec![],
         }
     }
+
+    /// returns all `Using` join columns in a logical plan
+    pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {

Review comment:
       FWIW this function feels like it might better belong in some sort of utils rather than a method on `LogicalPlan` -- perhaps https://github.com/houqp/arrow-datafusion/blob/qp_join/datafusion/src/optimizer/utils.rs#L50 

##########
File path: datafusion/src/optimizer/filter_push_down.rs
##########
@@ -232,6 +241,38 @@ fn split_members<'a>(predicate: &'a Expr, predicates: &mut Vec<&'a Expr>) {
     }
 }
 
+fn optimize_join(

Review comment:
       Nice

##########
File path: datafusion/src/physical_plan/hash_join.rs
##########
@@ -1996,16 +1992,16 @@ mod tests {
 
         let (columns, batches) = join_collect(left, right, on, &JoinType::Right).await?;
 
-        assert_eq!(columns, vec!["a1", "c1", "a2", "b1", "c2"]);
+        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]);
 
         let expected = vec![
-            "+----+----+----+----+----+",
-            "| a1 | c1 | a2 | b1 | c2 |",
-            "+----+----+----+----+----+",
-            "|    |    | 30 | 6  | 90 |",
-            "| 1  | 7  | 10 | 4  | 70 |",
-            "| 2  | 8  | 20 | 5  | 80 |",
-            "+----+----+----+----+----+",
+            "+----+----+----+----+----+----+",

Review comment:
       these changes make sense to me

##########
File path: datafusion/src/logical_plan/expr.rs
##########
@@ -1118,36 +1133,56 @@ pub fn columnize_expr(e: Expr, input_schema: &DFSchema) -> Expr {
     }
 }
 
+/// Recursively replace all Column expressions in a given expression tree with Column expressions
+/// provided by the hash map argument.
+pub fn replace_col(e: Expr, replace_map: &HashMap<&Column, &Column>) -> Result<Expr> {
+    struct ColumnReplacer<'a> {
+        replace_map: &'a HashMap<&'a Column, &'a Column>,
+    }
+
+    impl<'a> ExprRewriter for ColumnReplacer<'a> {
+        fn mutate(&mut self, expr: Expr) -> Result<Expr> {
+            if let Expr::Column(c) = &expr {
+                match self.replace_map.get(c) {
+                    Some(new_c) => Ok(Expr::Column((*new_c).to_owned())),
+                    None => Ok(expr),
+                }
+            } else {
+                Ok(expr)
+            }
+        }
+    }
+
+    e.rewrite(&mut ColumnReplacer { replace_map })
+}
+
 /// Recursively call [`Column::normalize`] on all Column expressions
 /// in the `expr` expression tree.
-pub fn normalize_col(e: Expr, schemas: &[&DFSchemaRef]) -> Result<Expr> {
-    struct ColumnNormalizer<'a, 'b> {
-        schemas: &'a [&'b DFSchemaRef],
+pub fn normalize_col(e: Expr, plan: &LogicalPlan) -> Result<Expr> {

Review comment:
       Note I wrote some tests that will need to be adjusted in https://github.com/apache/arrow-datafusion/pull/689 but that is no big deal




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#issuecomment-874946505


   Not sure if this will conflict with https://github.com/apache/arrow-datafusion/pull/660


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#discussion_r664709088



##########
File path: datafusion/src/logical_plan/expr.rs
##########
@@ -89,14 +89,46 @@ impl Column {
     ///
     /// For example, `foo` will be normalized to `t.foo` if there is a
     /// column named `foo` in a relation named `t` found in `schemas`
-    pub fn normalize(self, schemas: &[&DFSchemaRef]) -> Result<Self> {
+    pub fn normalize(self, plan: &LogicalPlan) -> Result<Self> {
         if self.relation.is_some() {
             return Ok(self);
         }
 
-        for schema in schemas {
-            if let Ok(field) = schema.field_with_unqualified_name(&self.name) {
-                return Ok(field.qualified_column());
+        let schemas = plan.all_schemas();
+        let using_columns = plan.using_columns()?;
+
+        for schema in &schemas {
+            let fields = schema.fields_with_unqualified_name(&self.name);
+            match fields.len() {
+                0 => continue,

Review comment:
       We are iterating through schemas from all plan nodes in the provided plan tree, each plan node could have different schemas, so when we do the `fields_with_unqualified_name` look up, some of these plan nodes will no contain a field that matches `self.name`. We just pick the first plan node that contains schema field matches the unqualified name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#issuecomment-875546431


   Thanks again @houqp 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#discussion_r664706662



##########
File path: datafusion/src/logical_plan/plan.rs
##########
@@ -354,6 +356,43 @@ impl LogicalPlan {
             | LogicalPlan::CreateExternalTable { .. } => vec![],
         }
     }
+
+    /// returns all `Using` join columns in a logical plan
+    pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
+        struct UsingJoinColumnVisitor {
+            using_columns: Vec<HashSet<Column>>,
+        }
+
+        impl PlanVisitor for UsingJoinColumnVisitor {
+            type Error = DataFusionError;
+
+            fn pre_visit(&mut self, plan: &LogicalPlan) -> Result<bool, Self::Error> {
+                if let LogicalPlan::Join {
+                    join_constraint: JoinConstraint::Using,
+                    on,
+                    ..
+                } = plan
+                {
+                    self.using_columns.push(
+                        on.iter()
+                            .map(|entry| {
+                                std::iter::once(entry.0.clone())
+                                    .chain(std::iter::once(entry.1.clone()))

Review comment:
       i wanted to avoid an extra memory allocation incurred by `Vec::new`, but i will double check to see if once chain is actually generating the optimal code without memory allocations.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#discussion_r663447267



##########
File path: datafusion/src/physical_plan/hash_utils.rs
##########
@@ -104,46 +88,11 @@ fn check_join_set_is_valid(
 
 /// Creates a schema for a join operation.
 /// The fields from the left side are first
-pub fn build_join_schema(
-    left: &Schema,
-    right: &Schema,
-    on: JoinOnRef,
-    join_type: &JoinType,
-) -> Schema {
+pub fn build_join_schema(left: &Schema, right: &Schema, join_type: &JoinType) -> Schema {

Review comment:
       same schema build simplification as the one we introduced in logical plan builder.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #605: fix join column handling logic for `On` and `Using` constraints

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #605:
URL: https://github.com/apache/arrow-datafusion/pull/605#issuecomment-875547555


   I also merge `apache/master` into this branch locally on my machine and re-ran the tests to verify there were no conflicts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org