You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "peter-toth (via GitHub)" <gi...@apache.org> on 2024/03/17 18:00:18 UTC

[PR] [SPARK-47217][SQL] Fix deduplicated expression resolution [spark]

peter-toth opened a new pull request, #45552:
URL: https://github.com/apache/spark/pull/45552

   ### What changes were proposed in this pull request?
   There seems to be a regression from Spark 3.5 to 4.0 caused by https://issues.apache.org/jira/browse/SPARK-43838 / https://github.com/apache/spark/pull/41347 as the following code no longer succeed:
   ```
   val df = Seq((1, 2)).toDF("a", "b")
   val df2 = df.select(df("a").as("aa"), df("b").as("bb"))
   val df3 = df2.join(df, df2("bb") === df("b")).select(df2("aa"), df("a"))
   df3.show()
   ```
   
   Please note that if we dig deeper then it turns out that if we omit the extra project from `df` then the following is actually a bug that came in with the very first version of `DeduplicateRelations`, that deduplicated `MultiInstanceRelation`s only:
   ```
   val schema = StructType.fromDDL("a int, b int")
   val rows = Seq(Row(1, 2))
   val rdd = sparkContext.parallelize(rows)
   val df = spark.createDataFrame(rdd, schema)
   val df2 = df.select(df("a").as("aa"), df("b").as("bb"))
   val df3 = df2.join(df, df2("bb") === df("b")).select(df2("aa"), df("a"))
   df3.show()
   ```
   
   The root cause seems to be `DeduplicateRelations` as it changes `df("a")` (`a#7`) comming from the right side when it runs on the join:
   ```
   === Applying Rule org.apache.spark.sql.catalyst.analysis.DeduplicateRelations ===
   !'Join Inner, '`=`(bb#12, b#8)              'Join Inner, '`=`(bb#12, b#18)
    :- Project [a#7 AS aa#11, b#8 AS bb#12]    :- Project [a#7 AS aa#11, b#8 AS bb#12]
    :  +- Project [_1#2 AS a#7, _2#3 AS b#8]   :  +- Project [_1#2 AS a#7, _2#3 AS b#8]
    :     +- LocalRelation [_1#2, _2#3]        :     +- LocalRelation [_1#2, _2#3]
   !+- Project [_1#2 AS a#7, _2#3 AS b#8]      +- Project [_1#15 AS a#17, _2#16 AS b#18]
   !   +- LocalRelation [_1#2, _2#3]              +- LocalRelation [_1#15, _2#16]
   ```
   and then when the `.select()` API adds a `Project` node containing `df("a")` above the join, it can't be resolved.
   
   This is because `DeduplicateRelations` always keeps the attributes of the first occurance of a node (`Project [_1#2 AS a#7, _2#3 AS b#8]` in this case) and creates new instances for other occurances. The rule doesn't (and can't) take into account if a top level attribute can actually come from a node or not.
   If `spark.sql.analyzer.failAmbiguousSelfJoin` is enabled then the `DetectAmbiguousSelfJoin` catches the issue as
   ```
   Column a#7 are ambiguous. It's probably because you joined several Datasets together, and some of these Datasets are the same. This column points to one of the Datasets but Spark is unable to figure out which one. Please alias the Datasets with different names via Dataset.as before joining them, and specify the column using qualified name, e.g. df.as("a").join(df.as("b"), $"a.id" > $"b.id"). You can also set spark.sql.analyzer.failAmbiguousSelfJoin to false to disable this check.
   ```
   If it is not enabled then `a#7` can't be resolved error is thrown.
   
   To solve the above regression this PR:
   - Assigns `LogicalPlan.PLAN_ID_TAG`s to logical plans of `Dataset`s that doesn't have any id. (Connect planner already does this.)
   - Changes resolved `AttributeReference`s to `UnresolvedAttribute`s in certain `Dataset` APIs if an attribute doesn't seem valid based on the output of the underlying logical plan. The `UnresolvedAttribute`s get the necessary tags to get resolved by the `ResolveReferences` rule (`ColumnResolutionHelper.tryResolveDataFrameColumns()`) later.
   
   
   ### Why are the changes needed?
   To fix the regression.
   
   
   ### Does this PR introduce _any_ user-facing change?
   Yes, it fixes the regression/
   
   
   ### How was this patch tested?
   New and existing UTs.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47217][SQL] Fix deduplicated expression resolution [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #45552:
URL: https://github.com/apache/spark/pull/45552#discussion_r1528028336


##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -227,6 +227,11 @@ class Dataset[T] private[sql](
       dsIds.add(id)
       plan.setTagValue(Dataset.DATASET_ID_TAG, dsIds)
     }
+    // A plan might get its PLAN_ID_TAG via connect or belong to multiple dataframes so only assign
+    // an id to a plan if it doesn't have any
+    if (plan.getTagValue(LogicalPlan.PLAN_ID_TAG).isEmpty) {

Review Comment:
   IIUC, this PR aims to fix a vanilla (non-Connect) Spark SQL issue with `PLAN_ID_TAG`.
   However, currently the `PLAN_ID_TAG` is only dedicated for Spark Connect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47217][SQL] Fix deduplicated expression resolution [spark]

Posted by "peter-toth (via GitHub)" <gi...@apache.org>.
peter-toth commented on code in PR #45552:
URL: https://github.com/apache/spark/pull/45552#discussion_r1528050496


##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -227,6 +227,11 @@ class Dataset[T] private[sql](
       dsIds.add(id)
       plan.setTagValue(Dataset.DATASET_ID_TAG, dsIds)
     }
+    // A plan might get its PLAN_ID_TAG via connect or belong to multiple dataframes so only assign
+    // an id to a plan if it doesn't have any
+    if (plan.getTagValue(LogicalPlan.PLAN_ID_TAG).isEmpty) {

Review Comment:
   Yes, that idea came up here: https://github.com/apache/spark/pull/45446#discussion_r1520922028



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47217][SQL] Fix deduplicated expression resolution [spark]

Posted by "peter-toth (via GitHub)" <gi...@apache.org>.
peter-toth commented on PR #45552:
URL: https://github.com/apache/spark/pull/45552#issuecomment-2010096626

   I had some time to play with connect today and as I said it does work well with the query in the PR description, but it doesn't seem to support even the most basic self joins:
   ```
   @ val df = Seq((1, 2)).toDF("a", "b")
   Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
   df: org.apache.spark.sql.package.DataFrame = [a: int, b: int]
   
   @ val df2 = df.select(df("a").as("aa"), df("b"))
   df2: org.apache.spark.sql.package.DataFrame = [aa: int, b: int]
   
   @ val df3 = df2.join(df, df2("b") === df("b"))
   df3: org.apache.spark.sql.package.DataFrame = Invalid Dataframe; [AMBIGUOUS_COLUMN_REFERENCE] Column "b" is ambiguous. It's because you joined several DataFrame together, and some of these DataFrames are the same.
   This column points to one of the DataFrames but Spark is unable to figure out which one.
   Please alias the DataFrames with different names via `DataFrame.alias` before joining them,
   and specify the column using qualified name, e.g. `df.alias("a").join(df.alias("b"), col("a.id") > col("b.id"))`. SQLSTATE: 42702
   ```
   Am I missing something?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47217][SQL] Fix deduplicated expression resolution [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #45552:
URL: https://github.com/apache/spark/pull/45552#issuecomment-2025219523

   @zhengruifeng can you take a look? I think we should make this work. We can find `df("b")` in both join sides, but this is a special case as `df("b")` directly comes from the right join side. I don't want to make the algorithm very complicated like searching column references from both sides and calculate the depth, but this simple special case is not ambiguous.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47217][SQL] Fix deduplicated expression resolution [spark]

Posted by "peter-toth (via GitHub)" <gi...@apache.org>.
peter-toth commented on PR #45552:
URL: https://github.com/apache/spark/pull/45552#issuecomment-2014648960

   @cloud-fan, I wonder if the above test should work in connect or is it always required in connect to alias conflicting columns as in SQL?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47217][SQL] Fix deduplicated expression resolution [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #45552:
URL: https://github.com/apache/spark/pull/45552#issuecomment-2007037219

   The column reference in classic Spark SQL DataFrame API is very broken and I really don't want to add more hacks here and there to fix certain cases. In Spark Connect, we've redesigned the column reference and it's much more reliable and reasonable.
   
   How about adding a config to let the classic column reference use the same implement of spark connect's? Ideally users should update their DataFrame query to always use named column like SQL API.
   ```
   df1 = abc.as("df1")
   df2 = xyz.as("df2")
   df1.join(df2, $"df1.col" === $"df2.col")
   ```
   But if users really want to stick with the old style, they can turn on the config.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47217][SQL] Fix deduplicated expression resolution [spark]

Posted by "peter-toth (via GitHub)" <gi...@apache.org>.
peter-toth commented on PR #45552:
URL: https://github.com/apache/spark/pull/45552#issuecomment-2018722441

   @cloud-fan, since connect style resolution doesn't seem to work in all cases I wonder if we can fix non-connect resolution with this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47217][SQL] Fix deduplicated expression resolution [spark]

Posted by "peter-toth (via GitHub)" <gi...@apache.org>.
peter-toth commented on PR #45552:
URL: https://github.com/apache/spark/pull/45552#issuecomment-2007273886

   I think unifying the resolution logic in Spark 4.0 is a good idea and the above example work well via connect. But I don't fully get the last sentence:
   
   > But if users really want to stick with the old style, they can turn on the config.
   
   Is there any kown downside of the new connect style resolution? Why would someone switch back to the old style?
   
   Also, what shall we do with Spark 3.5? It would be great to fix this regression without requiring users to change their code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47217][SQL] Fix deduplicated expression resolution [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #45552:
URL: https://github.com/apache/spark/pull/45552#issuecomment-2007311420

   There is no known downside of the connect style resolution, but it sounds scary to replace an existing implementation (which has been there for many years) with a new one completely, especially the old implementation is quite hacky and may work for some corner cases unintentionally.
   
   I'd prefer to add a config to enable connect style resolution, but not by default.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org