You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "ahshahid (via GitHub)" <gi...@apache.org> on 2024/03/01 00:35:52 UTC

[PR] SPARK-47217. bug fix for exception thrown in reused dataframes involv… [spark]

ahshahid opened a new pull request, #45343:
URL: https://github.com/apache/spark/pull/45343

   …ing joins once the plan is de-duplicated. The fix involves using Dataset ID associated with the plans & attributes to attempt correct resolution
   
   There seems to be a regression from Spark 2.4 as the following code no longer succeed:
   
   val df = Seq((1, 2)).toDF("a", "b")
   val df2 = df.select(df("a").as("aa"), df("b").as("bb"))
   val df3 = df2.join(df, df2("bb") === df("b")).select(df2("aa"), df("a"))
   df3.show()
   
   
   === Applying Rule org.apache.spark.sql.catalyst.analysis.DeduplicateRelations ===
   !'Join Inner, '`=`(bb#12, b#8)              'Join Inner, '`=`(bb#12, b#18)
    :- Project [a#7 AS aa#11, b#8 AS bb#12]    :- Project [a#7 AS aa#11, b#8 AS bb#12]
    :  +- Project [_1#2 AS a#7, _2#3 AS b#8]   :  +- Project [_1#2 AS a#7, _2#3 AS b#8]
    :     +- LocalRelation [_1#2, _2#3]        :     +- LocalRelation [_1#2, _2#3]
   !+- Project [_1#2 AS a#7, _2#3 AS b#8]      +- Project [_1#15 AS a#17, _2#16 AS b#18]
   !   +- LocalRelation [_1#2, _2#3]              +- LocalRelation [_1#15, _2#16]
   and so Spark 3 thinks df("a") is ambigous:
   Column a#7 are ambiguous. It's probably because you joined several Datasets together, and some of these Datasets are the same. This column points to one of the Datasets but Spark is unable to figure out which one. Please alias the Datasets with different names via Dataset.as before joining them, and specify the column using qualified name, e.g. df.as("a").join(df.as("b"), $"a.id" > $"b.id"). You can also set spark.sql.analyzer.failAmbiguousSelfJoin to false to disable this check.
   
   If we disable spark.sql.analyzer.failAmbiguousSelfJoin then the real issue reveals: Due to the deduplication the last .select(df2("aa"), df("a")) doesn't work any more.
   
   [MISSING_ATTRIBUTES.RESOLVED_ATTRIBUTE_APPEAR_IN_OPERATION] Resolved attribute(s) "a" missing from "aa", "bb", "a", "b" in operator !Project [aa#11, a#7]. Attribute(s) with the same name appear in the operation: "a".
   Please check if the right attribute(s) are used. SQLSTATE: XX000;
   !Project [aa#11, a#7]
   +- Join Inner, (bb#12 = b#18)
      :- Project [a#7 AS aa#11, b#8 AS bb#12]
      :  +- Project [_1#2 AS a#7, _2#3 AS b#8]
      :     +- LocalRelation [_1#2, _2#3]
      +- Project [_1#15 AS a#17, _2#16 AS b#18]
         +- LocalRelation [_1#15, _2#16]
   
   Similar issues are seen in nested joins and AsOfJoins where same reference Logical Plans are duplicated in the query .
   
   ### What changes were proposed in this pull request?
   The PR attemps to fix the issue in following way
   1) If the projection fields contain AttributeReference which are not found in the incoming AttributeSet,  and the AttributeRef metadata contains the DatasetId info, then the AttributeRef is converted into a new UnresolvedAttributeWithTag and the original attributeRef is passed as paramter .
   
   2) In the ColumnResolutionHelper,  to resolve the UnresolvedAttributeRefWithTag, a new resolution logic is used:
   The dataSetId from the original attribute ref's metadata is extracted.
   
   3) The first  BinaryNode  contained in the LogicalPlan containing this unresolved attribute,  is found.
   Then its right leg & left lag's unary nodes are checked for the presennce of DatasetID of attribute ref, using TreeNodeTag("__datasetid").
   If both the legs contain datasetId or neither contains, then resolution exception is thrown
   Else the leg which contains datasetId is used to resolve.
   
   
   ### Why are the changes needed?
   To fix the bug as exposed by the unit tests in the PR
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   Precheckin run.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47217][SQL] Fix ambiguity check in self joins [spark]

Posted by "ahshahid (via GitHub)" <gi...@apache.org>.
ahshahid closed pull request #45343: [SPARK-47217][SQL] Fix ambiguity check in self joins
URL: https://github.com/apache/spark/pull/45343


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47217][SQL] Fix ambiguity check in self joins [spark]

Posted by "ahshahid (via GitHub)" <gi...@apache.org>.
ahshahid commented on code in PR #45343:
URL: https://github.com/apache/spark/pull/45343#discussion_r1522039397


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala:
##########
@@ -498,4 +559,70 @@ class DataFrameSelfJoinSuite extends QueryTest with SharedSparkSession {
       assert(df1.join(df2, $"t1.i" === $"t2.i").cache().count() == 1)
     }
   }
+
+  test("SPARK_47217: deduplication of project causes ambiguity in resolution") {
+    val df = Seq((1, 2)).toDF("a", "b")
+    val df2 = df.select(df("a").as("aa"), df("b").as("bb"))
+    val df3 = df2.join(df, df2("bb") === df("b")).select(df2("aa"), df("a"))
+    checkAnswer(
+      df3,
+      Row(1, 1) :: Nil)
+  }
+
+  test("SPARK-47217. deduplication in nested joins with join attribute aliased") {
+    val df1 = Seq((1, 2)).toDF("a", "b")
+    val df2 = Seq((1, 2)).toDF("aa", "bb")
+    val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a").as("aaa"),
+      df2("aa"), df1("b"))
+
+    assertCorrectResolution(df1Joindf2.join(df1, df1Joindf2("aaa") === df1("a")),
+      Resolution.LeftConditionToLeftLeg, Resolution.RightConditionToRightLeg)
+
+    assertCorrectResolution(df1.join(df1Joindf2, df1Joindf2("aaa") === df1("a")),
+      Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg)
+
+    val proj1 = df1Joindf2.join(df1, df1Joindf2("aaa") === df1("a")).select(df1Joindf2("aa"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]
+    val join1 = proj1.child.asInstanceOf[Join]
+    assert(proj1.projectList(0).references.subsetOf(join1.left.outputSet))
+    assert(proj1.projectList(1).references.subsetOf(join1.right.outputSet))
+
+    val proj2 = df1.join(df1Joindf2, df1Joindf2("aaa") === df1("a")).select(df1Joindf2("aa"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]
+    val join2 = proj2.child.asInstanceOf[Join]
+    assert(proj2.projectList(0).references.subsetOf(join2.right.outputSet))
+    assert(proj2.projectList(1).references.subsetOf(join2.left.outputSet))
+  }
+
+  test("SPARK-47217. deduplication in nested joins without join attribute aliased") {
+    val df1 = Seq((1, 2)).toDF("a", "b")
+    val df2 = Seq((1, 2)).toDF("aa", "bb")
+    val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b"))
+
+    assertCorrectResolution(df1Joindf2.join(df1, df1Joindf2("a") === df1("a")),
+      Resolution.LeftConditionToLeftLeg, Resolution.RightConditionToRightLeg)
+
+    assertCorrectResolution(df1.join(df1Joindf2, df1Joindf2("a") === df1("a")),
+      Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg)
+
+    val proj1 = df1Joindf2.join(df1, df1Joindf2("a") === df1("a")).select(df1Joindf2("a"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]

Review Comment:
   @peter-toth 
   As per this PR code change , case like :  df1Joindf2.join(df1, df1("a") === df1("a")) is resolved as both LHS and RHS resolving to same Df1 dataset ( which makes the join crosss product) , and then later this situation is handled via brute force through function "resolveSelfJoinCondition"
   But if the user has put df1Joindf2.join(df1, df1("a") === df1Joindf2("a")), then it gets handled via this PR change  as it is a valid situation.
   
   "But in a select on the join result using df1("a") should be ambigous as df1("a") could be selected from both legs of the join. I.e. both df1Joindf2.select(df1("a")) and df1.select(df1("a")) work."
   
   Based on my understanding of the above example :
   The way I interpret it is:
   val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b"))
   
   The below join condition is resolved by the HACK 
   val df4 = df1Joindf2.join(df1, df1("a") === df1("a"))
   
   where as 
   if it was
   val df4 = df1Joindf2.join(df1, df1("a") === df1Joindf2("a"))
   the above is a natural and logical resolution.
   
   df4 is a join of df1Joindf2 and df1 ( we consider only top level join)
   so in a select on df4.,  IMO  
   df4.select(df1("a) , df1Joindf2("a)), there is no ambiguity as one is being taken from df1 and other from df1joinDf2.
   
   Moreover, the Join Condition in itself  should not effect the output  attributes of the  Join Plan, irrespective of how the join condition is interpreted ( via hack or new code path)
   
   I understand that from point of view of ExprId it can be viewed as ambiguity.  though that ambiguity goes away when viewed from datasetId .
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47217][SQL] Fix ambiguity check in self joins [spark]

Posted by "ahshahid (via GitHub)" <gi...@apache.org>.
ahshahid commented on code in PR #45343:
URL: https://github.com/apache/spark/pull/45343#discussion_r1520395436


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala:
##########
@@ -498,4 +559,70 @@ class DataFrameSelfJoinSuite extends QueryTest with SharedSparkSession {
       assert(df1.join(df2, $"t1.i" === $"t2.i").cache().count() == 1)
     }
   }
+
+  test("SPARK_47217: deduplication of project causes ambiguity in resolution") {
+    val df = Seq((1, 2)).toDF("a", "b")
+    val df2 = df.select(df("a").as("aa"), df("b").as("bb"))
+    val df3 = df2.join(df, df2("bb") === df("b")).select(df2("aa"), df("a"))
+    checkAnswer(
+      df3,
+      Row(1, 1) :: Nil)
+  }
+
+  test("SPARK-47217. deduplication in nested joins with join attribute aliased") {
+    val df1 = Seq((1, 2)).toDF("a", "b")
+    val df2 = Seq((1, 2)).toDF("aa", "bb")
+    val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a").as("aaa"),
+      df2("aa"), df1("b"))
+
+    assertCorrectResolution(df1Joindf2.join(df1, df1Joindf2("aaa") === df1("a")),
+      Resolution.LeftConditionToLeftLeg, Resolution.RightConditionToRightLeg)
+
+    assertCorrectResolution(df1.join(df1Joindf2, df1Joindf2("aaa") === df1("a")),
+      Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg)
+
+    val proj1 = df1Joindf2.join(df1, df1Joindf2("aaa") === df1("a")).select(df1Joindf2("aa"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]
+    val join1 = proj1.child.asInstanceOf[Join]
+    assert(proj1.projectList(0).references.subsetOf(join1.left.outputSet))
+    assert(proj1.projectList(1).references.subsetOf(join1.right.outputSet))
+
+    val proj2 = df1.join(df1Joindf2, df1Joindf2("aaa") === df1("a")).select(df1Joindf2("aa"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]
+    val join2 = proj2.child.asInstanceOf[Join]
+    assert(proj2.projectList(0).references.subsetOf(join2.right.outputSet))
+    assert(proj2.projectList(1).references.subsetOf(join2.left.outputSet))
+  }
+
+  test("SPARK-47217. deduplication in nested joins without join attribute aliased") {
+    val df1 = Seq((1, 2)).toDF("a", "b")
+    val df2 = Seq((1, 2)).toDF("aa", "bb")
+    val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b"))
+
+    assertCorrectResolution(df1Joindf2.join(df1, df1Joindf2("a") === df1("a")),
+      Resolution.LeftConditionToLeftLeg, Resolution.RightConditionToRightLeg)
+
+    assertCorrectResolution(df1.join(df1Joindf2, df1Joindf2("a") === df1("a")),
+      Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg)
+
+    val proj1 = df1Joindf2.join(df1, df1Joindf2("a") === df1("a")).select(df1Joindf2("a"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]

Review Comment:
   @peter-toth  Also another source of confusion is that join condition is attempted being resolved on un-deduplicated plan.  Once the join plan is de-duplicated, and then join condition attempted being resolved,  the problem changes from being ambiguous resolution to I suppose both LHS & RHS  resolving to same leg, which is then handled the way of resolution by tag Id.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47217][SQL] Fix ambiguity check in self joins [spark]

Posted by "peter-toth (via GitHub)" <gi...@apache.org>.
peter-toth commented on code in PR #45343:
URL: https://github.com/apache/spark/pull/45343#discussion_r1521535582


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala:
##########
@@ -498,4 +559,70 @@ class DataFrameSelfJoinSuite extends QueryTest with SharedSparkSession {
       assert(df1.join(df2, $"t1.i" === $"t2.i").cache().count() == 1)
     }
   }
+
+  test("SPARK_47217: deduplication of project causes ambiguity in resolution") {
+    val df = Seq((1, 2)).toDF("a", "b")
+    val df2 = df.select(df("a").as("aa"), df("b").as("bb"))
+    val df3 = df2.join(df, df2("bb") === df("b")).select(df2("aa"), df("a"))
+    checkAnswer(
+      df3,
+      Row(1, 1) :: Nil)
+  }
+
+  test("SPARK-47217. deduplication in nested joins with join attribute aliased") {
+    val df1 = Seq((1, 2)).toDF("a", "b")
+    val df2 = Seq((1, 2)).toDF("aa", "bb")
+    val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a").as("aaa"),
+      df2("aa"), df1("b"))
+
+    assertCorrectResolution(df1Joindf2.join(df1, df1Joindf2("aaa") === df1("a")),
+      Resolution.LeftConditionToLeftLeg, Resolution.RightConditionToRightLeg)
+
+    assertCorrectResolution(df1.join(df1Joindf2, df1Joindf2("aaa") === df1("a")),
+      Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg)
+
+    val proj1 = df1Joindf2.join(df1, df1Joindf2("aaa") === df1("a")).select(df1Joindf2("aa"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]
+    val join1 = proj1.child.asInstanceOf[Join]
+    assert(proj1.projectList(0).references.subsetOf(join1.left.outputSet))
+    assert(proj1.projectList(1).references.subsetOf(join1.right.outputSet))
+
+    val proj2 = df1.join(df1Joindf2, df1Joindf2("aaa") === df1("a")).select(df1Joindf2("aa"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]
+    val join2 = proj2.child.asInstanceOf[Join]
+    assert(proj2.projectList(0).references.subsetOf(join2.right.outputSet))
+    assert(proj2.projectList(1).references.subsetOf(join2.left.outputSet))
+  }
+
+  test("SPARK-47217. deduplication in nested joins without join attribute aliased") {
+    val df1 = Seq((1, 2)).toDF("a", "b")
+    val df2 = Seq((1, 2)).toDF("aa", "bb")
+    val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b"))
+
+    assertCorrectResolution(df1Joindf2.join(df1, df1Joindf2("a") === df1("a")),
+      Resolution.LeftConditionToLeftLeg, Resolution.RightConditionToRightLeg)
+
+    assertCorrectResolution(df1.join(df1Joindf2, df1Joindf2("a") === df1("a")),
+      Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg)
+
+    val proj1 = df1Joindf2.join(df1, df1Joindf2("a") === df1("a")).select(df1Joindf2("a"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]

Review Comment:
   > I am looking at it from Point of View of user, that is one is coming from df1("a") and another from df1Joindf2("a"), so there is no ambiguity either in Join or Select
   
   I agree that there is no ambiguity in `Join` becuase there we can suppose that the user would like to build a relation between the left and right sides (so one side of the `===` must come from left and the other from the right side). This is why the following join with `df1("a") === df1("a")` also works:
   ```
   val df1 = Seq((1, 2)).toDF("a", "b")
   val df2 = Seq((1, 2)).toDF("aa", "bb")
   val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b"))
   val df4 = df1Joindf2.join(df1, df1("a") === df1("a"))
   df4.explain(true)
   ```
   But in a select on the join result using `df1("a")` should be ambigous as `df1("a")` could be selected from both legs of the join. I.e. both `df1Joindf2.select(df1("a"))` and `df1.select(df1("a"))` work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47217][SQL] Fix ambiguity check in self joins [spark]

Posted by "ahshahid (via GitHub)" <gi...@apache.org>.
ahshahid commented on code in PR #45343:
URL: https://github.com/apache/spark/pull/45343#discussion_r1520323348


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala:
##########
@@ -498,4 +559,70 @@ class DataFrameSelfJoinSuite extends QueryTest with SharedSparkSession {
       assert(df1.join(df2, $"t1.i" === $"t2.i").cache().count() == 1)
     }
   }
+
+  test("SPARK_47217: deduplication of project causes ambiguity in resolution") {
+    val df = Seq((1, 2)).toDF("a", "b")
+    val df2 = df.select(df("a").as("aa"), df("b").as("bb"))
+    val df3 = df2.join(df, df2("bb") === df("b")).select(df2("aa"), df("a"))
+    checkAnswer(
+      df3,
+      Row(1, 1) :: Nil)
+  }
+
+  test("SPARK-47217. deduplication in nested joins with join attribute aliased") {
+    val df1 = Seq((1, 2)).toDF("a", "b")
+    val df2 = Seq((1, 2)).toDF("aa", "bb")
+    val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a").as("aaa"),
+      df2("aa"), df1("b"))
+
+    assertCorrectResolution(df1Joindf2.join(df1, df1Joindf2("aaa") === df1("a")),
+      Resolution.LeftConditionToLeftLeg, Resolution.RightConditionToRightLeg)
+
+    assertCorrectResolution(df1.join(df1Joindf2, df1Joindf2("aaa") === df1("a")),
+      Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg)
+
+    val proj1 = df1Joindf2.join(df1, df1Joindf2("aaa") === df1("a")).select(df1Joindf2("aa"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]
+    val join1 = proj1.child.asInstanceOf[Join]
+    assert(proj1.projectList(0).references.subsetOf(join1.left.outputSet))
+    assert(proj1.projectList(1).references.subsetOf(join1.right.outputSet))
+
+    val proj2 = df1.join(df1Joindf2, df1Joindf2("aaa") === df1("a")).select(df1Joindf2("aa"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]
+    val join2 = proj2.child.asInstanceOf[Join]
+    assert(proj2.projectList(0).references.subsetOf(join2.right.outputSet))
+    assert(proj2.projectList(1).references.subsetOf(join2.left.outputSet))
+  }
+
+  test("SPARK-47217. deduplication in nested joins without join attribute aliased") {
+    val df1 = Seq((1, 2)).toDF("a", "b")
+    val df2 = Seq((1, 2)).toDF("aa", "bb")
+    val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b"))
+
+    assertCorrectResolution(df1Joindf2.join(df1, df1Joindf2("a") === df1("a")),
+      Resolution.LeftConditionToLeftLeg, Resolution.RightConditionToRightLeg)
+
+    assertCorrectResolution(df1.join(df1Joindf2, df1Joindf2("a") === df1("a")),
+      Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg)
+
+    val proj1 = df1Joindf2.join(df1, df1Joindf2("a") === df1("a")).select(df1Joindf2("a"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]

Review Comment:
   "Shouldn't selecting df1("a") be ambiguous here? It is not ambiguous in the join condition because df1Joindf2("a") can come from only one side so the df1("a") must come from the other side. But after the join I'm not sure we can follow the same logic."
   
   As I envisage , mentally, for the Join with select as 
                                            Project
                                                   |
                                               Filter    
                                                   |
                                                Join
   
   So what is true for filter in terms of attribute refs should also be true for Project.
   
   But I am not basing my view on above ( i.e in terms of ExprIDs).
   I am looking at it from Point of View of user, that is one is coming from df1("a") and another from  df1Joindf2("a"), so there is no ambiguity either in Join or Select



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47217][SQL] Fix ambiguity check in self joins [spark]

Posted by "peter-toth (via GitHub)" <gi...@apache.org>.
peter-toth commented on code in PR #45343:
URL: https://github.com/apache/spark/pull/45343#discussion_r1520078411


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala:
##########
@@ -498,4 +559,70 @@ class DataFrameSelfJoinSuite extends QueryTest with SharedSparkSession {
       assert(df1.join(df2, $"t1.i" === $"t2.i").cache().count() == 1)
     }
   }
+
+  test("SPARK_47217: deduplication of project causes ambiguity in resolution") {
+    val df = Seq((1, 2)).toDF("a", "b")
+    val df2 = df.select(df("a").as("aa"), df("b").as("bb"))
+    val df3 = df2.join(df, df2("bb") === df("b")).select(df2("aa"), df("a"))
+    checkAnswer(
+      df3,
+      Row(1, 1) :: Nil)
+  }
+
+  test("SPARK-47217. deduplication in nested joins with join attribute aliased") {
+    val df1 = Seq((1, 2)).toDF("a", "b")
+    val df2 = Seq((1, 2)).toDF("aa", "bb")
+    val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a").as("aaa"),
+      df2("aa"), df1("b"))
+
+    assertCorrectResolution(df1Joindf2.join(df1, df1Joindf2("aaa") === df1("a")),
+      Resolution.LeftConditionToLeftLeg, Resolution.RightConditionToRightLeg)
+
+    assertCorrectResolution(df1.join(df1Joindf2, df1Joindf2("aaa") === df1("a")),
+      Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg)
+
+    val proj1 = df1Joindf2.join(df1, df1Joindf2("aaa") === df1("a")).select(df1Joindf2("aa"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]
+    val join1 = proj1.child.asInstanceOf[Join]
+    assert(proj1.projectList(0).references.subsetOf(join1.left.outputSet))
+    assert(proj1.projectList(1).references.subsetOf(join1.right.outputSet))
+
+    val proj2 = df1.join(df1Joindf2, df1Joindf2("aaa") === df1("a")).select(df1Joindf2("aa"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]
+    val join2 = proj2.child.asInstanceOf[Join]
+    assert(proj2.projectList(0).references.subsetOf(join2.right.outputSet))
+    assert(proj2.projectList(1).references.subsetOf(join2.left.outputSet))
+  }
+
+  test("SPARK-47217. deduplication in nested joins without join attribute aliased") {
+    val df1 = Seq((1, 2)).toDF("a", "b")
+    val df2 = Seq((1, 2)).toDF("aa", "bb")
+    val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b"))
+
+    assertCorrectResolution(df1Joindf2.join(df1, df1Joindf2("a") === df1("a")),
+      Resolution.LeftConditionToLeftLeg, Resolution.RightConditionToRightLeg)
+
+    assertCorrectResolution(df1.join(df1Joindf2, df1Joindf2("a") === df1("a")),
+      Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg)
+
+    val proj1 = df1Joindf2.join(df1, df1Joindf2("a") === df1("a")).select(df1Joindf2("a"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]

Review Comment:
   Shouldn't selecting `df1("a")` be ambiguous here? It is not ambiguous in the join condition because `df1Joindf2("a")` can come from only one side so the `df1("a")` must come from the other side. But after the join I'm not sure we can follow the same logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47217][SQL] Fix ambiguity check in self joins [spark]

Posted by "peter-toth (via GitHub)" <gi...@apache.org>.
peter-toth commented on code in PR #45343:
URL: https://github.com/apache/spark/pull/45343#discussion_r1520078411


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala:
##########
@@ -498,4 +559,70 @@ class DataFrameSelfJoinSuite extends QueryTest with SharedSparkSession {
       assert(df1.join(df2, $"t1.i" === $"t2.i").cache().count() == 1)
     }
   }
+
+  test("SPARK_47217: deduplication of project causes ambiguity in resolution") {
+    val df = Seq((1, 2)).toDF("a", "b")
+    val df2 = df.select(df("a").as("aa"), df("b").as("bb"))
+    val df3 = df2.join(df, df2("bb") === df("b")).select(df2("aa"), df("a"))
+    checkAnswer(
+      df3,
+      Row(1, 1) :: Nil)
+  }
+
+  test("SPARK-47217. deduplication in nested joins with join attribute aliased") {
+    val df1 = Seq((1, 2)).toDF("a", "b")
+    val df2 = Seq((1, 2)).toDF("aa", "bb")
+    val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a").as("aaa"),
+      df2("aa"), df1("b"))
+
+    assertCorrectResolution(df1Joindf2.join(df1, df1Joindf2("aaa") === df1("a")),
+      Resolution.LeftConditionToLeftLeg, Resolution.RightConditionToRightLeg)
+
+    assertCorrectResolution(df1.join(df1Joindf2, df1Joindf2("aaa") === df1("a")),
+      Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg)
+
+    val proj1 = df1Joindf2.join(df1, df1Joindf2("aaa") === df1("a")).select(df1Joindf2("aa"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]
+    val join1 = proj1.child.asInstanceOf[Join]
+    assert(proj1.projectList(0).references.subsetOf(join1.left.outputSet))
+    assert(proj1.projectList(1).references.subsetOf(join1.right.outputSet))
+
+    val proj2 = df1.join(df1Joindf2, df1Joindf2("aaa") === df1("a")).select(df1Joindf2("aa"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]
+    val join2 = proj2.child.asInstanceOf[Join]
+    assert(proj2.projectList(0).references.subsetOf(join2.right.outputSet))
+    assert(proj2.projectList(1).references.subsetOf(join2.left.outputSet))
+  }
+
+  test("SPARK-47217. deduplication in nested joins without join attribute aliased") {
+    val df1 = Seq((1, 2)).toDF("a", "b")
+    val df2 = Seq((1, 2)).toDF("aa", "bb")
+    val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b"))
+
+    assertCorrectResolution(df1Joindf2.join(df1, df1Joindf2("a") === df1("a")),
+      Resolution.LeftConditionToLeftLeg, Resolution.RightConditionToRightLeg)
+
+    assertCorrectResolution(df1.join(df1Joindf2, df1Joindf2("a") === df1("a")),
+      Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg)
+
+    val proj1 = df1Joindf2.join(df1, df1Joindf2("a") === df1("a")).select(df1Joindf2("a"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]

Review Comment:
   Shouldn't selecting `df1("a")` be ambiguous here? It is not ambiguous in the join condition because `df1Joindf2("a")` can come from only one side so the `df1("a")` must come from the other side. But after the join I'm not sure why it shouldn't be ambiguous.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47217][SQL] Fix ambiguity check in self joins [spark]

Posted by "ahshahid (via GitHub)" <gi...@apache.org>.
ahshahid commented on code in PR #45343:
URL: https://github.com/apache/spark/pull/45343#discussion_r1520314613


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala:
##########
@@ -498,4 +559,70 @@ class DataFrameSelfJoinSuite extends QueryTest with SharedSparkSession {
       assert(df1.join(df2, $"t1.i" === $"t2.i").cache().count() == 1)
     }
   }
+
+  test("SPARK_47217: deduplication of project causes ambiguity in resolution") {
+    val df = Seq((1, 2)).toDF("a", "b")
+    val df2 = df.select(df("a").as("aa"), df("b").as("bb"))
+    val df3 = df2.join(df, df2("bb") === df("b")).select(df2("aa"), df("a"))
+    checkAnswer(
+      df3,
+      Row(1, 1) :: Nil)
+  }
+
+  test("SPARK-47217. deduplication in nested joins with join attribute aliased") {
+    val df1 = Seq((1, 2)).toDF("a", "b")
+    val df2 = Seq((1, 2)).toDF("aa", "bb")
+    val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a").as("aaa"),
+      df2("aa"), df1("b"))
+
+    assertCorrectResolution(df1Joindf2.join(df1, df1Joindf2("aaa") === df1("a")),
+      Resolution.LeftConditionToLeftLeg, Resolution.RightConditionToRightLeg)
+
+    assertCorrectResolution(df1.join(df1Joindf2, df1Joindf2("aaa") === df1("a")),
+      Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg)
+
+    val proj1 = df1Joindf2.join(df1, df1Joindf2("aaa") === df1("a")).select(df1Joindf2("aa"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]
+    val join1 = proj1.child.asInstanceOf[Join]
+    assert(proj1.projectList(0).references.subsetOf(join1.left.outputSet))
+    assert(proj1.projectList(1).references.subsetOf(join1.right.outputSet))
+
+    val proj2 = df1.join(df1Joindf2, df1Joindf2("aaa") === df1("a")).select(df1Joindf2("aa"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]
+    val join2 = proj2.child.asInstanceOf[Join]
+    assert(proj2.projectList(0).references.subsetOf(join2.right.outputSet))
+    assert(proj2.projectList(1).references.subsetOf(join2.left.outputSet))
+  }
+
+  test("SPARK-47217. deduplication in nested joins without join attribute aliased") {
+    val df1 = Seq((1, 2)).toDF("a", "b")
+    val df2 = Seq((1, 2)).toDF("aa", "bb")
+    val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b"))
+
+    assertCorrectResolution(df1Joindf2.join(df1, df1Joindf2("a") === df1("a")),
+      Resolution.LeftConditionToLeftLeg, Resolution.RightConditionToRightLeg)
+
+    assertCorrectResolution(df1.join(df1Joindf2, df1Joindf2("a") === df1("a")),
+      Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg)
+
+    val proj1 = df1Joindf2.join(df1, df1Joindf2("a") === df1("a")).select(df1Joindf2("a"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]

Review Comment:
   @peter-toth @cloud-fan  I have opened a new PR [SPARK-47320-PR](https://github.com/apache/spark/pull/45446).
   It contains more information and also an existing master's test which fails when join order reversed.
   Currently this PR is identical to SPARK-47320-PR ..I will try to see if this PR can be splitted , else will close it and keep the SPARK-47320-PR as the only PR for review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45959][SQL] : Deduplication of relations causes ambiguous exception to be raised, though logically it is Unambiguous [spark]

Posted by "ahshahid (via GitHub)" <gi...@apache.org>.
ahshahid commented on PR #45343:
URL: https://github.com/apache/spark/pull/45343#issuecomment-1982020269

   While fixing the issue, found that tests in DataFrameSelfJoinSuite, where the two legs have distinct datasetId , the tests are expecting AnalysisException due to ambiguity in the join condition, which IMO is not correct as it is possible to dis-ambiguate using resolution via Dataset ID.
   This PR fixes the above and have test modifications accordingly.
   The same applies to some tests in DataFrameAsofJoinSuite.
   I believe there is still need to do some enhancement  in the existing  source code related to AsofJoin, as the condition part is still not trying to disambiguate properly ( unlike the changes done for Join). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47217][SQL] Fix ambiguity check in self joins [spark]

Posted by "peter-toth (via GitHub)" <gi...@apache.org>.
peter-toth commented on code in PR #45343:
URL: https://github.com/apache/spark/pull/45343#discussion_r1520115101


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala:
##########
@@ -498,4 +559,70 @@ class DataFrameSelfJoinSuite extends QueryTest with SharedSparkSession {
       assert(df1.join(df2, $"t1.i" === $"t2.i").cache().count() == 1)
     }
   }
+
+  test("SPARK_47217: deduplication of project causes ambiguity in resolution") {
+    val df = Seq((1, 2)).toDF("a", "b")
+    val df2 = df.select(df("a").as("aa"), df("b").as("bb"))
+    val df3 = df2.join(df, df2("bb") === df("b")).select(df2("aa"), df("a"))
+    checkAnswer(
+      df3,
+      Row(1, 1) :: Nil)
+  }
+
+  test("SPARK-47217. deduplication in nested joins with join attribute aliased") {
+    val df1 = Seq((1, 2)).toDF("a", "b")
+    val df2 = Seq((1, 2)).toDF("aa", "bb")
+    val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a").as("aaa"),
+      df2("aa"), df1("b"))
+
+    assertCorrectResolution(df1Joindf2.join(df1, df1Joindf2("aaa") === df1("a")),
+      Resolution.LeftConditionToLeftLeg, Resolution.RightConditionToRightLeg)
+
+    assertCorrectResolution(df1.join(df1Joindf2, df1Joindf2("aaa") === df1("a")),
+      Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg)
+
+    val proj1 = df1Joindf2.join(df1, df1Joindf2("aaa") === df1("a")).select(df1Joindf2("aa"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]
+    val join1 = proj1.child.asInstanceOf[Join]
+    assert(proj1.projectList(0).references.subsetOf(join1.left.outputSet))
+    assert(proj1.projectList(1).references.subsetOf(join1.right.outputSet))
+
+    val proj2 = df1.join(df1Joindf2, df1Joindf2("aaa") === df1("a")).select(df1Joindf2("aa"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]
+    val join2 = proj2.child.asInstanceOf[Join]
+    assert(proj2.projectList(0).references.subsetOf(join2.right.outputSet))
+    assert(proj2.projectList(1).references.subsetOf(join2.left.outputSet))
+  }
+
+  test("SPARK-47217. deduplication in nested joins without join attribute aliased") {
+    val df1 = Seq((1, 2)).toDF("a", "b")
+    val df2 = Seq((1, 2)).toDF("aa", "bb")
+    val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b"))
+
+    assertCorrectResolution(df1Joindf2.join(df1, df1Joindf2("a") === df1("a")),
+      Resolution.LeftConditionToLeftLeg, Resolution.RightConditionToRightLeg)
+
+    assertCorrectResolution(df1.join(df1Joindf2, df1Joindf2("a") === df1("a")),
+      Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg)
+
+    val proj1 = df1Joindf2.join(df1, df1Joindf2("a") === df1("a")).select(df1Joindf2("a"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]

Review Comment:
   cc @cloud-fan 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47217][SQL] Fix ambiguity check in self joins [spark]

Posted by "ahshahid (via GitHub)" <gi...@apache.org>.
ahshahid commented on code in PR #45343:
URL: https://github.com/apache/spark/pull/45343#discussion_r1522039397


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala:
##########
@@ -498,4 +559,70 @@ class DataFrameSelfJoinSuite extends QueryTest with SharedSparkSession {
       assert(df1.join(df2, $"t1.i" === $"t2.i").cache().count() == 1)
     }
   }
+
+  test("SPARK_47217: deduplication of project causes ambiguity in resolution") {
+    val df = Seq((1, 2)).toDF("a", "b")
+    val df2 = df.select(df("a").as("aa"), df("b").as("bb"))
+    val df3 = df2.join(df, df2("bb") === df("b")).select(df2("aa"), df("a"))
+    checkAnswer(
+      df3,
+      Row(1, 1) :: Nil)
+  }
+
+  test("SPARK-47217. deduplication in nested joins with join attribute aliased") {
+    val df1 = Seq((1, 2)).toDF("a", "b")
+    val df2 = Seq((1, 2)).toDF("aa", "bb")
+    val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a").as("aaa"),
+      df2("aa"), df1("b"))
+
+    assertCorrectResolution(df1Joindf2.join(df1, df1Joindf2("aaa") === df1("a")),
+      Resolution.LeftConditionToLeftLeg, Resolution.RightConditionToRightLeg)
+
+    assertCorrectResolution(df1.join(df1Joindf2, df1Joindf2("aaa") === df1("a")),
+      Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg)
+
+    val proj1 = df1Joindf2.join(df1, df1Joindf2("aaa") === df1("a")).select(df1Joindf2("aa"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]
+    val join1 = proj1.child.asInstanceOf[Join]
+    assert(proj1.projectList(0).references.subsetOf(join1.left.outputSet))
+    assert(proj1.projectList(1).references.subsetOf(join1.right.outputSet))
+
+    val proj2 = df1.join(df1Joindf2, df1Joindf2("aaa") === df1("a")).select(df1Joindf2("aa"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]
+    val join2 = proj2.child.asInstanceOf[Join]
+    assert(proj2.projectList(0).references.subsetOf(join2.right.outputSet))
+    assert(proj2.projectList(1).references.subsetOf(join2.left.outputSet))
+  }
+
+  test("SPARK-47217. deduplication in nested joins without join attribute aliased") {
+    val df1 = Seq((1, 2)).toDF("a", "b")
+    val df2 = Seq((1, 2)).toDF("aa", "bb")
+    val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b"))
+
+    assertCorrectResolution(df1Joindf2.join(df1, df1Joindf2("a") === df1("a")),
+      Resolution.LeftConditionToLeftLeg, Resolution.RightConditionToRightLeg)
+
+    assertCorrectResolution(df1.join(df1Joindf2, df1Joindf2("a") === df1("a")),
+      Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg)
+
+    val proj1 = df1Joindf2.join(df1, df1Joindf2("a") === df1("a")).select(df1Joindf2("a"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]

Review Comment:
   @peter-toth 
   As per this PR code change , case like :  df1Joindf2.join(df1, df1("a") === df1("a")) is resolved as both LHS and RHS resolving to same Df1 dataset ( which makes the join crosss product) , and then later this situation is handled via brute force through function "resolveSelfJoinCondition"
   But if the user has put df1Joindf2.join(df1, df1("a") === df1Joindf2("a")), then it gets handled via this PR change  as it is a valid situation.
   
   "But in a select on the join result using df1("a") should be ambigous as df1("a") could be selected from both legs of the join. I.e. both df1Joindf2.select(df1("a")) and df1.select(df1("a")) work."
   
   Based on my understanding of the above example :
   The way I interpret it is:
   val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b"))
   
   This join condition is resolved by the HACK 
   val df4 = df1Joindf2.join(df1, df1("a") === df1("a"))
   
   where as 
   if it was
   val df4 = df1Joindf2.join(df1, df1("a") === df1Joindf2("a"))
   the above is a natural and logical resolution.
   
   df4 is a join of df1Joindf2 and df1 ( we consider only top level join)
   so in a select on df4.,  IMO  
   df4.select(df1("a) , df1Joindf2("a)), there is no ambiguity as one is being taken from df1 and other from df1joinDf2.
   
   Moreover, the Join Condition in itself  should not effect the output  attributes of the  Join Plan, irrespective of how the join condition is interpreted ( via hack or new code path)
   
   I understand that from point of view of ExprId it can be viewed as ambiguity.  though that ambiguity goes away when viewed from datasetId .
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47217][SQL] Fix ambiguity check in self joins [spark]

Posted by "ahshahid (via GitHub)" <gi...@apache.org>.
ahshahid commented on PR #45343:
URL: https://github.com/apache/spark/pull/45343#issuecomment-1996044845

   The PR #45446 handles the issue comprehensively and this PR was a subset of the changes contained in PR #45446 , so closing this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47217][SQL] Fix ambiguity check in self joins [spark]

Posted by "peter-toth (via GitHub)" <gi...@apache.org>.
peter-toth commented on code in PR #45343:
URL: https://github.com/apache/spark/pull/45343#discussion_r1520049076


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala:
##########
@@ -498,4 +559,70 @@ class DataFrameSelfJoinSuite extends QueryTest with SharedSparkSession {
       assert(df1.join(df2, $"t1.i" === $"t2.i").cache().count() == 1)
     }
   }
+
+  test("SPARK_47217: deduplication of project causes ambiguity in resolution") {
+    val df = Seq((1, 2)).toDF("a", "b")
+    val df2 = df.select(df("a").as("aa"), df("b").as("bb"))
+    val df3 = df2.join(df, df2("bb") === df("b")).select(df2("aa"), df("a"))
+    checkAnswer(
+      df3,
+      Row(1, 1) :: Nil)
+  }
+
+  test("SPARK-47217. deduplication in nested joins with join attribute aliased") {

Review Comment:
   Can you please use `:` instead of `.` in test names? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47217][SQL] Fix ambiguity check in self joins [spark]

Posted by "peter-toth (via GitHub)" <gi...@apache.org>.
peter-toth commented on code in PR #45343:
URL: https://github.com/apache/spark/pull/45343#discussion_r1521535582


##########
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala:
##########
@@ -498,4 +559,70 @@ class DataFrameSelfJoinSuite extends QueryTest with SharedSparkSession {
       assert(df1.join(df2, $"t1.i" === $"t2.i").cache().count() == 1)
     }
   }
+
+  test("SPARK_47217: deduplication of project causes ambiguity in resolution") {
+    val df = Seq((1, 2)).toDF("a", "b")
+    val df2 = df.select(df("a").as("aa"), df("b").as("bb"))
+    val df3 = df2.join(df, df2("bb") === df("b")).select(df2("aa"), df("a"))
+    checkAnswer(
+      df3,
+      Row(1, 1) :: Nil)
+  }
+
+  test("SPARK-47217. deduplication in nested joins with join attribute aliased") {
+    val df1 = Seq((1, 2)).toDF("a", "b")
+    val df2 = Seq((1, 2)).toDF("aa", "bb")
+    val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a").as("aaa"),
+      df2("aa"), df1("b"))
+
+    assertCorrectResolution(df1Joindf2.join(df1, df1Joindf2("aaa") === df1("a")),
+      Resolution.LeftConditionToLeftLeg, Resolution.RightConditionToRightLeg)
+
+    assertCorrectResolution(df1.join(df1Joindf2, df1Joindf2("aaa") === df1("a")),
+      Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg)
+
+    val proj1 = df1Joindf2.join(df1, df1Joindf2("aaa") === df1("a")).select(df1Joindf2("aa"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]
+    val join1 = proj1.child.asInstanceOf[Join]
+    assert(proj1.projectList(0).references.subsetOf(join1.left.outputSet))
+    assert(proj1.projectList(1).references.subsetOf(join1.right.outputSet))
+
+    val proj2 = df1.join(df1Joindf2, df1Joindf2("aaa") === df1("a")).select(df1Joindf2("aa"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]
+    val join2 = proj2.child.asInstanceOf[Join]
+    assert(proj2.projectList(0).references.subsetOf(join2.right.outputSet))
+    assert(proj2.projectList(1).references.subsetOf(join2.left.outputSet))
+  }
+
+  test("SPARK-47217. deduplication in nested joins without join attribute aliased") {
+    val df1 = Seq((1, 2)).toDF("a", "b")
+    val df2 = Seq((1, 2)).toDF("aa", "bb")
+    val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b"))
+
+    assertCorrectResolution(df1Joindf2.join(df1, df1Joindf2("a") === df1("a")),
+      Resolution.LeftConditionToLeftLeg, Resolution.RightConditionToRightLeg)
+
+    assertCorrectResolution(df1.join(df1Joindf2, df1Joindf2("a") === df1("a")),
+      Resolution.LeftConditionToRightLeg, Resolution.RightConditionToLeftLeg)
+
+    val proj1 = df1Joindf2.join(df1, df1Joindf2("a") === df1("a")).select(df1Joindf2("a"),
+      df1("a")).queryExecution.analyzed.asInstanceOf[Project]

Review Comment:
   > I am looking at it from Point of View of user, that is one is coming from df1("a") and another from df1Joindf2("a"), so there is no ambiguity either in Join or Select
   
   I agree that there is no ambiguity in `Join` becuase there we can suppose that the user would like to build a relation between the left and right sides (so one side of the `===` must come from left and the other from the right side). This is why the following join with `df1("a") === df1("a")` also works:
   ```
   val df1 = Seq((1, 2)).toDF("a", "b")
   val df2 = Seq((1, 2)).toDF("aa", "bb")
   val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b"))
   val df4 = df1Joindf2.join(df1, df1("a") === df1("a"))
   df4.explain(true)
   ```
   But in a select on the join result using `df1("a")` should be ambigous as `df1("a")` could be selected from both legs of the join. I.e. both `df1Joindf2.select(df1("a"))` and `df1.select(df1("a"))` works.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org