You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "zhengruifeng (via GitHub)" <gi...@apache.org> on 2023/12/29 06:17:10 UTC

[PR] [WIP][SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

zhengruifeng opened a new pull request, #44532:
URL: https://github.com/apache/spark/pull/44532

   ### What changes were proposed in this pull request?
   fix the logic of ambiguous column detection in spark connect
   
   ### Why are the changes needed?
   ```
   In [24]: df1 = spark.range(10).withColumn("a", sf.lit(0))
   
   In [25]: df2 = df1.withColumnRenamed("a", "b")
   
   In [26]: df1.join(df2, df1["a"] == df2["b"])
   Out[26]: 23/12/22 09:33:28 ERROR ErrorUtils: Spark Connect RPC error during: analyze. UserId: ruifeng.zheng. SessionId: eaa2161f-4b64-4dbf-9809-af6b696d3005.
   org.apache.spark.sql.AnalysisException: [AMBIGUOUS_COLUMN_REFERENCE] Column a is ambiguous. It's because you joined several DataFrame together, and some of these DataFrames are the same.
   This column points to one of the DataFrame but Spark is unable to figure out which one.
   Please alias the DataFrames with different names via DataFrame.alias before joining them,
   and specify the column using qualified name, e.g. df.alias("a").join(df.alias("b"), col("a.id") > col("b.id")). SQLSTATE: 42702
   	at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.findPlanById(ColumnResolutionHelper.scala:555)
   	at 
   
   ```
   
   
   ### Does this PR introduce _any_ user-facing change?
   yes, fix a bug
   
   
   ### How was this patch tested?
   added ut
   
   ### Was this patch authored or co-authored using generative AI tooling?
   no


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1441714170


##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -330,6 +330,12 @@
     ],
     "sqlState" : "42704"
   },
+  "CANNOT_RESOLVE_WITH_PLAN_ID" : {
+    "message" : [
+      "Cannot resolve <expression> with plan id <id> in plan <plan>. It's probably because of illegal references like `df1.select(df2.col(\"a\"))`."

Review Comment:
   plan id is internal and let's not expose it to end users.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1442795460


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -487,42 +487,69 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
   //       original expression as it is.
   private def tryResolveColumnByPlanId(
       e: Expression,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan] = mutable.HashMap.empty): Expression = e match {
+      q: Seq[LogicalPlan]): Expression = e match {
     case u: UnresolvedAttribute =>
-      resolveUnresolvedAttributeByPlanId(
-        u, q, idToPlan: mutable.HashMap[Long, LogicalPlan]
-      ).getOrElse(u)
+      u.getTagValue(LogicalPlan.PLAN_ID_TAG) match {

Review Comment:
   I feel it a bit more clear, but can also change back



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1443679493


##########
python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py:
##########
@@ -151,12 +151,9 @@ def test_basic(self):
         assert_frame_equal(expected1.toPandas(), result1.toPandas())
 
         # Groupby one expression and aggregate one UDF with literal
-        result2 = df.groupby((col("id") + 1)).agg(weighted_mean_udf(df.v, lit(1.0))).sort(df.id + 1)
-        expected2 = (
-            df.groupby((col("id") + 1))
-            .agg(mean(df.v).alias("weighted_mean(v, 1.0)"))
-            .sort(df.id + 1)
-        )
+        key_col = col("id") + 1

Review Comment:
   minimum reproducer:
   ```
   from pyspark.sql import functions as sf
   df1 = spark.range(10).withColumn("a", sf.lit(0))
   df1.groupby(df1.id + 1).agg(sf.max(df1.a)).sort(df1.id + 1)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1438547075


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -520,9 +524,33 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
             "planId" -> planId.toString,
             "q" -> q.toString))
       }
-    })
 
-    val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).isDefined
+      val matched = found.flatMap { case (child, nodes) =>
+        val resolved = nodes
+          .flatMap(resolveUnresolvedAttributeByPlan(u, _, isMetadataAccess))
+        if (isMetadataAccess) {
+          // NOTE: A metadata column might appear in `output` instead of `metadataOutput`.
+          val metadataOutputSet = child.outputSet ++ AttributeSet(child.metadataOutput)

Review Comment:
   need to filter with `output+metadataOutput`, otherwise metadata column's tests will fail



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1442398662


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -487,42 +487,76 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
   //       original expression as it is.
   private def tryResolveColumnByPlanId(
       e: Expression,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan] = mutable.HashMap.empty): Expression = e match {
+      q: LogicalPlan): Expression = e match {
     case u: UnresolvedAttribute =>
-      resolveUnresolvedAttributeByPlanId(
-        u, q, idToPlan: mutable.HashMap[Long, LogicalPlan]
-      ).getOrElse(u)
+      resolveUnresolvedAttributeByPlanId(u, q).getOrElse(u)
     case _ if e.containsPattern(UNRESOLVED_ATTRIBUTE) =>
-      e.mapChildren(c => tryResolveColumnByPlanId(c, q, idToPlan))
+      e.mapChildren(c => tryResolveColumnByPlanId(c, q))
     case _ => e
   }
 
   private def resolveUnresolvedAttributeByPlanId(
       u: UnresolvedAttribute,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan]): Option[NamedExpression] = {
+      q: LogicalPlan): Option[NamedExpression] = {
     val planIdOpt = u.getTagValue(LogicalPlan.PLAN_ID_TAG)
     if (planIdOpt.isEmpty) return None
     val planId = planIdOpt.get
     logDebug(s"Extract plan_id $planId from $u")
 
-    val plan = idToPlan.getOrElseUpdate(planId, {
-      findPlanById(u, planId, q).getOrElse {
-        // For example:
-        //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
-        //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
-        //  df1.select(df2.a)   <-   illegal reference df2.a
+    val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).isDefined
+
+    val resolved = resolveUnresolvedAttributeByPlanId(u, planId, isMetadataAccess, q)
+    if (resolved.isEmpty) {
+      // For example:
+      //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
+      //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
+      //  df1.select(df2.a)   <-   illegal reference df2.a
+      throw new AnalysisException(
+        errorClass = "CANNOT_RESOLVE_WITH_PLAN_ID",
+        messageParameters = Map(
+          "expression" -> toSQLId(u.nameParts),
+          "id" -> planId.toString,
+          "plan" -> q.toString
+        ),
+        origin = u.origin
+      )
+    }
+    resolved
+  }
+
+  private def resolveUnresolvedAttributeByPlanId(
+      u: UnresolvedAttribute,
+      id: Long,
+      isMetadataAccess: Boolean,
+      p: LogicalPlan): Option[NamedExpression] = {
+    if (p.getTagValue(LogicalPlan.PLAN_ID_TAG).contains(id)) {
+        resolveUnresolvedAttributeByPlan(u, p, isMetadataAccess)
+    } else {
+      val candidates = p.children.flatMap { child =>
+        val outputSet = if (isMetadataAccess) {
+          // NOTE: A metadata column might appear in `output` instead of `metadataOutput`.
+          child.outputSet ++ AttributeSet(child.metadataOutput)
+        } else {
+          child.outputSet
+        }
+        resolveUnresolvedAttributeByPlanId(u, id, isMetadataAccess, child)

Review Comment:
   I guess here should use child's outputSet instead of `p`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1442788120


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -487,42 +487,69 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
   //       original expression as it is.
   private def tryResolveColumnByPlanId(
       e: Expression,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan] = mutable.HashMap.empty): Expression = e match {
+      q: Seq[LogicalPlan]): Expression = e match {
     case u: UnresolvedAttribute =>
-      resolveUnresolvedAttributeByPlanId(
-        u, q, idToPlan: mutable.HashMap[Long, LogicalPlan]
-      ).getOrElse(u)
+      u.getTagValue(LogicalPlan.PLAN_ID_TAG) match {
+        case Some(id) =>
+          resolveUnresolvedAttributeByPlanId(u, id, q)
+        case _ => u
+      }
     case _ if e.containsPattern(UNRESOLVED_ATTRIBUTE) =>
-      e.mapChildren(c => tryResolveColumnByPlanId(c, q, idToPlan))
+      e.mapChildren(c => tryResolveColumnByPlanId(c, q))
     case _ => e
   }
 
   private def resolveUnresolvedAttributeByPlanId(
       u: UnresolvedAttribute,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan]): Option[NamedExpression] = {
-    val planIdOpt = u.getTagValue(LogicalPlan.PLAN_ID_TAG)
-    if (planIdOpt.isEmpty) return None
-    val planId = planIdOpt.get
-    logDebug(s"Extract plan_id $planId from $u")
-
-    val plan = idToPlan.getOrElseUpdate(planId, {
-      findPlanById(u, planId, q).getOrElse {
-        // For example:
-        //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
-        //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
-        //  df1.select(df2.a)   <-   illegal reference df2.a
-        throw new AnalysisException(
-          errorClass = "_LEGACY_ERROR_TEMP_3051",
-          messageParameters = Map(
-            "u" -> u.toString,
-            "planId" -> planId.toString,
-            "q" -> q.toString))
+      id: Long,
+      q: Seq[LogicalPlan]): NamedExpression = {
+    val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).isDefined
+    // resolve at most 2 ambiguous references
+    val resolved = q.iterator
+      .flatMap(resolveUnresolvedAttributeByPlanId(u, id, isMetadataAccess, _))
+      .take(2).toSeq
+    if (resolved.isEmpty) {
+      //  e.g. df1.select(df2.a)   <-   illegal reference df2.a
+      throw QueryCompilationErrors.cannotResolveColumn(u)
+    } else if (resolved.length > 1) {
+      throw QueryCompilationErrors.ambiguousColumnReferences(u)
+    }
+    resolved.head
+  }
+
+  private def resolveUnresolvedAttributeByPlanId(
+      u: UnresolvedAttribute,
+      id: Long,
+      isMetadataAccess: Boolean,
+      p: LogicalPlan): Option[NamedExpression] = {
+    if (p.getTagValue(LogicalPlan.PLAN_ID_TAG).contains(id)) {
+      resolveUnresolvedAttributeByPlan(u, p, isMetadataAccess)
+    } else {
+      val candidates = p.children.flatMap { child =>

Review Comment:
   I still think we should filter the candidates with the current plan `p.outputSet`, instead of the children plans.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1441713593


##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -330,6 +330,12 @@
     ],
     "sqlState" : "42704"
   },
+  "CANNOT_RESOLVE_WITH_PLAN_ID" : {
+    "message" : [
+      "Cannot resolve <expression> with plan id <id> in plan <plan>. It's probably because of illegal references like `df1.select(df2.col(\"a\"))`."

Review Comment:
   ```suggestion
         "Cannot resolve dataframe column <colName>. It's probably because of illegal references like `df1.select(df2.col(\"a\"))`."
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1442794622


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -487,42 +487,69 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
   //       original expression as it is.
   private def tryResolveColumnByPlanId(
       e: Expression,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan] = mutable.HashMap.empty): Expression = e match {
+      q: Seq[LogicalPlan]): Expression = e match {
     case u: UnresolvedAttribute =>
-      resolveUnresolvedAttributeByPlanId(
-        u, q, idToPlan: mutable.HashMap[Long, LogicalPlan]
-      ).getOrElse(u)
+      u.getTagValue(LogicalPlan.PLAN_ID_TAG) match {
+        case Some(id) =>
+          resolveUnresolvedAttributeByPlanId(u, id, q)
+        case _ => u
+      }
     case _ if e.containsPattern(UNRESOLVED_ATTRIBUTE) =>
-      e.mapChildren(c => tryResolveColumnByPlanId(c, q, idToPlan))
+      e.mapChildren(c => tryResolveColumnByPlanId(c, q))
     case _ => e
   }
 
   private def resolveUnresolvedAttributeByPlanId(
       u: UnresolvedAttribute,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan]): Option[NamedExpression] = {
-    val planIdOpt = u.getTagValue(LogicalPlan.PLAN_ID_TAG)
-    if (planIdOpt.isEmpty) return None
-    val planId = planIdOpt.get
-    logDebug(s"Extract plan_id $planId from $u")
-
-    val plan = idToPlan.getOrElseUpdate(planId, {
-      findPlanById(u, planId, q).getOrElse {
-        // For example:
-        //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
-        //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
-        //  df1.select(df2.a)   <-   illegal reference df2.a
-        throw new AnalysisException(
-          errorClass = "_LEGACY_ERROR_TEMP_3051",
-          messageParameters = Map(
-            "u" -> u.toString,
-            "planId" -> planId.toString,
-            "q" -> q.toString))
+      id: Long,
+      q: Seq[LogicalPlan]): NamedExpression = {
+    val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).isDefined
+    // resolve at most 2 ambiguous references
+    val resolved = q.iterator
+      .flatMap(resolveUnresolvedAttributeByPlanId(u, id, isMetadataAccess, _))
+      .take(2).toSeq
+    if (resolved.isEmpty) {
+      //  e.g. df1.select(df2.a)   <-   illegal reference df2.a
+      throw QueryCompilationErrors.cannotResolveColumn(u)
+    } else if (resolved.length > 1) {
+      throw QueryCompilationErrors.ambiguousColumnReferences(u)
+    }
+    resolved.head
+  }
+
+  private def resolveUnresolvedAttributeByPlanId(
+      u: UnresolvedAttribute,
+      id: Long,
+      isMetadataAccess: Boolean,
+      p: LogicalPlan): Option[NamedExpression] = {
+    if (p.getTagValue(LogicalPlan.PLAN_ID_TAG).contains(id)) {
+      resolveUnresolvedAttributeByPlan(u, p, isMetadataAccess)
+    } else {
+      val candidates = p.children.flatMap { child =>

Review Comment:
   got it, will change back



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1440005640


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -487,42 +487,81 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
   //       original expression as it is.
   private def tryResolveColumnByPlanId(
       e: Expression,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan] = mutable.HashMap.empty): Expression = e match {
+      q: LogicalPlan): Expression = e match {
     case u: UnresolvedAttribute =>
-      resolveUnresolvedAttributeByPlanId(
-        u, q, idToPlan: mutable.HashMap[Long, LogicalPlan]
-      ).getOrElse(u)
+      resolveUnresolvedAttributeByPlanId(u, q).getOrElse(u)
     case _ if e.containsPattern(UNRESOLVED_ATTRIBUTE) =>
-      e.mapChildren(c => tryResolveColumnByPlanId(c, q, idToPlan))
+      e.mapChildren(c => tryResolveColumnByPlanId(c, q))
     case _ => e
   }
 
   private def resolveUnresolvedAttributeByPlanId(
       u: UnresolvedAttribute,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan]): Option[NamedExpression] = {
+      q: LogicalPlan): Option[NamedExpression] = {
     val planIdOpt = u.getTagValue(LogicalPlan.PLAN_ID_TAG)
     if (planIdOpt.isEmpty) return None
     val planId = planIdOpt.get
     logDebug(s"Extract plan_id $planId from $u")
 
-    val plan = idToPlan.getOrElseUpdate(planId, {
-      findPlanById(u, planId, q).getOrElse {
-        // For example:
-        //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
-        //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
-        //  df1.select(df2.a)   <-   illegal reference df2.a
-        throw new AnalysisException(
-          errorClass = "_LEGACY_ERROR_TEMP_3051",
-          messageParameters = Map(
-            "u" -> u.toString,
-            "planId" -> planId.toString,
-            "q" -> q.toString))
+    val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).isDefined
+
+    var result = Option.empty[NamedExpression]
+    var numMatched = 0
+    resolveUnresolvedAttributeByPlanId(u, planId, isMetadataAccess, q).foreach {
+      case Some(resolved) =>
+        numMatched += 1
+        if (result.isEmpty) {
+          result = Some(resolved)
+        } else {
+          throw new AnalysisException(
+            errorClass = "AMBIGUOUS_COLUMN_REFERENCE",
+            messageParameters = Map("name" -> toSQLId(u.nameParts)),
+            origin = u.origin
+          )
+        }
+      case _ => numMatched += 1
+    }
+    if (numMatched == 0) {

Review Comment:
   I guess we still need a separate method to throw this exception if no matched plan (by plan id) is found



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1443679762


##########
python/pyspark/pandas/frame.py:
##########
@@ -7577,7 +7577,16 @@ def _sort(
             (False, "first"): Column.desc_nulls_first,
             (False, "last"): Column.desc_nulls_last,
         }
-        by = [mapper[(asc, na_position)](scol) for scol, asc in zip(by, ascending)]
+

Review Comment:
   minimum reproducer
   ```
   from pyspark.sql import functions as sf
   df1 = spark.range(10).withColumn("a", sf.lit(0))
   df1.select((df1.id + df1.a).alias("b")).sort(df1.id)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1445750062


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -485,80 +485,86 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
   //    4. if more than one matching nodes are found, fail due to ambiguous column reference;
   //    5. resolve the expression with the matching node, if any error occurs here, return the
   //       original expression as it is.
-  private def tryResolveColumnByPlanId(
+  private def tryResolveDataFrameColumns(
       e: Expression,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan] = mutable.HashMap.empty): Expression = e match {
+      q: Seq[LogicalPlan]): Expression = e match {
     case u: UnresolvedAttribute =>
-      resolveUnresolvedAttributeByPlanId(
-        u, q, idToPlan: mutable.HashMap[Long, LogicalPlan]
-      ).getOrElse(u)
+      resolveDataFrameColumn(u, q).getOrElse(u)
     case _ if e.containsPattern(UNRESOLVED_ATTRIBUTE) =>
-      e.mapChildren(c => tryResolveColumnByPlanId(c, q, idToPlan))
+      e.mapChildren(c => tryResolveDataFrameColumns(c, q))
     case _ => e
   }
 
-  private def resolveUnresolvedAttributeByPlanId(
+  private def resolveDataFrameColumn(
       u: UnresolvedAttribute,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan]): Option[NamedExpression] = {
+      q: Seq[LogicalPlan]): Option[NamedExpression] = {
     val planIdOpt = u.getTagValue(LogicalPlan.PLAN_ID_TAG)
     if (planIdOpt.isEmpty) return None
     val planId = planIdOpt.get
     logDebug(s"Extract plan_id $planId from $u")
 
-    val plan = idToPlan.getOrElseUpdate(planId, {
-      findPlanById(u, planId, q).getOrElse {
-        // For example:
-        //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
-        //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
-        //  df1.select(df2.a)   <-   illegal reference df2.a
-        throw new AnalysisException(
-          errorClass = "_LEGACY_ERROR_TEMP_3051",
-          messageParameters = Map(
-            "u" -> u.toString,
-            "planId" -> planId.toString,
-            "q" -> q.toString))
-      }
-    })
+    val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).nonEmpty
+    val (resolved, matched) = resolveDataFrameColumnByPlanId(u, planId, isMetadataAccess, q)
+    if (!matched) {
+      // Can not find the target plan node with plan id, e.g.
+      //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
+      //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
+      //  df1.select(df2.a)   <-   illegal reference df2.a
+      throw QueryCompilationErrors.cannotResolveColumn(u)
+    }
+    resolved
+  }
 
-    val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).isDefined
-    try {
-      if (!isMetadataAccess) {
-        plan.resolve(u.nameParts, conf.resolver)
-      } else if (u.nameParts.size == 1) {
-        plan.getMetadataAttributeByNameOpt(u.nameParts.head)
-      } else {
-        None
+  private def resolveDataFrameColumnByPlanId(
+      u: UnresolvedAttribute,
+      id: Long,
+      isMetadataAccess: Boolean,
+      q: Seq[LogicalPlan]): (Option[NamedExpression], Boolean) = {
+    q.iterator.map(resolveDataFrameColumnRecursively(u, id, isMetadataAccess, _))
+      .foldLeft((Option.empty[NamedExpression], false)) {
+        case ((r1, m1), (r2, m2)) =>
+          if (r1.nonEmpty && r2.nonEmpty) {
+            throw QueryCompilationErrors.ambiguousColumnReferences(u)
+          }
+          (if (r1.nonEmpty) r1 else r2, m1 | m2)
       }
-    } catch {
-      case e: AnalysisException =>
-        logDebug(s"Fail to resolve $u with $plan due to $e")
-        None
-    }
   }
 
-  private def findPlanById(
+  private def resolveDataFrameColumnRecursively(
       u: UnresolvedAttribute,
       id: Long,
-      plan: LogicalPlan): Option[LogicalPlan] = {
-    if (plan.getTagValue(LogicalPlan.PLAN_ID_TAG).contains(id)) {
-      Some(plan)
-    } else if (plan.children.length == 1) {
-      findPlanById(u, id, plan.children.head)
-    } else if (plan.children.length > 1) {
-      val matched = plan.children.flatMap(findPlanById(u, id, _))
-      if (matched.length > 1) {
-        throw new AnalysisException(
-          errorClass = "AMBIGUOUS_COLUMN_REFERENCE",
-          messageParameters = Map("name" -> toSQLId(u.nameParts)),
-          origin = u.origin
-        )
-      } else {
-        matched.headOption
+      isMetadataAccess: Boolean,
+      p: LogicalPlan): (Option[NamedExpression], Boolean) = {
+    val (resolved, matched) = if (p.getTagValue(LogicalPlan.PLAN_ID_TAG).contains(id)) {
+      var resolved = Option.empty[NamedExpression]
+      try {
+        if (!isMetadataAccess) {
+          resolved = p.resolve(u.nameParts, conf.resolver)
+        } else if (u.nameParts.size == 1) {
+          resolved = p.getMetadataAttributeByNameOpt(u.nameParts.head)
+        }
+      } catch {
+        case e: AnalysisException =>
+          logDebug(s"Fail to resolve $u with $p due to $e")
       }
+      (resolved, true)
     } else {
-      None
+      resolveDataFrameColumnByPlanId(u, id, isMetadataAccess, p.children)
+    }
+
+    // Even with the target plan node, tryResolveDataFrameColumns still
+    // can not guarantee successfully resolving u:
+    // there are several rules supporting missing column resolution
+    // (e.g. ResolveReferencesInSort), but the resolved attribute maybe filtered
+    // out by the output attribute set.
+    // In this case, fall back to column resolution without plan id.
+    val filtered = resolved.filter { r =>
+      if (isMetadataAccess) {
+        r.references.subsetOf(AttributeSet(p.output ++ p.metadataOutput))
+      } else {
+        r.references.subsetOf(p.outputSet)
+      }
     }
+    (filtered, matched)

Review Comment:
   The comment here should say that the `filtered` can be None, and the dataframe column will remain unresolved and we will try to resolve it without plan id later. The reason is support missing column resolution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1445746187


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -485,80 +485,86 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
   //    4. if more than one matching nodes are found, fail due to ambiguous column reference;
   //    5. resolve the expression with the matching node, if any error occurs here, return the
   //       original expression as it is.
-  private def tryResolveColumnByPlanId(
+  private def tryResolveDataFrameColumns(
       e: Expression,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan] = mutable.HashMap.empty): Expression = e match {
+      q: Seq[LogicalPlan]): Expression = e match {
     case u: UnresolvedAttribute =>
-      resolveUnresolvedAttributeByPlanId(
-        u, q, idToPlan: mutable.HashMap[Long, LogicalPlan]
-      ).getOrElse(u)
+      resolveDataFrameColumn(u, q).getOrElse(u)
     case _ if e.containsPattern(UNRESOLVED_ATTRIBUTE) =>
-      e.mapChildren(c => tryResolveColumnByPlanId(c, q, idToPlan))
+      e.mapChildren(c => tryResolveDataFrameColumns(c, q))
     case _ => e
   }
 
-  private def resolveUnresolvedAttributeByPlanId(
+  private def resolveDataFrameColumn(
       u: UnresolvedAttribute,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan]): Option[NamedExpression] = {
+      q: Seq[LogicalPlan]): Option[NamedExpression] = {
     val planIdOpt = u.getTagValue(LogicalPlan.PLAN_ID_TAG)
     if (planIdOpt.isEmpty) return None
     val planId = planIdOpt.get
     logDebug(s"Extract plan_id $planId from $u")
 
-    val plan = idToPlan.getOrElseUpdate(planId, {
-      findPlanById(u, planId, q).getOrElse {
-        // For example:
-        //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
-        //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
-        //  df1.select(df2.a)   <-   illegal reference df2.a
-        throw new AnalysisException(
-          errorClass = "_LEGACY_ERROR_TEMP_3051",
-          messageParameters = Map(
-            "u" -> u.toString,
-            "planId" -> planId.toString,
-            "q" -> q.toString))
-      }
-    })
+    val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).nonEmpty
+    val (resolved, matched) = resolveDataFrameColumnByPlanId(u, planId, isMetadataAccess, q)
+    if (!matched) {
+      // Can not find the target plan node with plan id, e.g.
+      //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
+      //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
+      //  df1.select(df2.a)   <-   illegal reference df2.a
+      throw QueryCompilationErrors.cannotResolveColumn(u)
+    }
+    resolved
+  }
 
-    val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).isDefined
-    try {
-      if (!isMetadataAccess) {
-        plan.resolve(u.nameParts, conf.resolver)
-      } else if (u.nameParts.size == 1) {
-        plan.getMetadataAttributeByNameOpt(u.nameParts.head)
-      } else {
-        None
+  private def resolveDataFrameColumnByPlanId(
+      u: UnresolvedAttribute,
+      id: Long,
+      isMetadataAccess: Boolean,
+      q: Seq[LogicalPlan]): (Option[NamedExpression], Boolean) = {
+    q.iterator.map(resolveDataFrameColumnRecursively(u, id, isMetadataAccess, _))
+      .foldLeft((Option.empty[NamedExpression], false)) {
+        case ((r1, m1), (r2, m2)) =>
+          if (r1.nonEmpty && r2.nonEmpty) {
+            throw QueryCompilationErrors.ambiguousColumnReferences(u)
+          }
+          (if (r1.nonEmpty) r1 else r2, m1 | m2)
       }
-    } catch {
-      case e: AnalysisException =>
-        logDebug(s"Fail to resolve $u with $plan due to $e")
-        None
-    }
   }
 
-  private def findPlanById(
+  private def resolveDataFrameColumnRecursively(
       u: UnresolvedAttribute,
       id: Long,
-      plan: LogicalPlan): Option[LogicalPlan] = {
-    if (plan.getTagValue(LogicalPlan.PLAN_ID_TAG).contains(id)) {
-      Some(plan)
-    } else if (plan.children.length == 1) {
-      findPlanById(u, id, plan.children.head)
-    } else if (plan.children.length > 1) {
-      val matched = plan.children.flatMap(findPlanById(u, id, _))
-      if (matched.length > 1) {
-        throw new AnalysisException(
-          errorClass = "AMBIGUOUS_COLUMN_REFERENCE",
-          messageParameters = Map("name" -> toSQLId(u.nameParts)),
-          origin = u.origin
-        )
-      } else {
-        matched.headOption
+      isMetadataAccess: Boolean,
+      p: LogicalPlan): (Option[NamedExpression], Boolean) = {
+    val (resolved, matched) = if (p.getTagValue(LogicalPlan.PLAN_ID_TAG).contains(id)) {
+      var resolved = Option.empty[NamedExpression]

Review Comment:
   nit:
   ```
   val resolved = try {
     if ...
   } catch {
     ...
     None
   }
   (resolved, true)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1441712533


##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -330,6 +330,12 @@
     ],
     "sqlState" : "42704"
   },
+  "CANNOT_RESOLVE_WITH_PLAN_ID" : {

Review Comment:
   ```suggestion
     "CANNOT_RESOLVE_DATAFRAME_COLUMN" : {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1440104336


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -487,42 +487,81 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
   //       original expression as it is.
   private def tryResolveColumnByPlanId(
       e: Expression,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan] = mutable.HashMap.empty): Expression = e match {
+      q: LogicalPlan): Expression = e match {
     case u: UnresolvedAttribute =>
-      resolveUnresolvedAttributeByPlanId(
-        u, q, idToPlan: mutable.HashMap[Long, LogicalPlan]
-      ).getOrElse(u)
+      resolveUnresolvedAttributeByPlanId(u, q).getOrElse(u)
     case _ if e.containsPattern(UNRESOLVED_ATTRIBUTE) =>
-      e.mapChildren(c => tryResolveColumnByPlanId(c, q, idToPlan))
+      e.mapChildren(c => tryResolveColumnByPlanId(c, q))
     case _ => e
   }
 
   private def resolveUnresolvedAttributeByPlanId(
       u: UnresolvedAttribute,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan]): Option[NamedExpression] = {
+      q: LogicalPlan): Option[NamedExpression] = {
     val planIdOpt = u.getTagValue(LogicalPlan.PLAN_ID_TAG)
     if (planIdOpt.isEmpty) return None
     val planId = planIdOpt.get
     logDebug(s"Extract plan_id $planId from $u")
 
-    val plan = idToPlan.getOrElseUpdate(planId, {
-      findPlanById(u, planId, q).getOrElse {
-        // For example:
-        //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
-        //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
-        //  df1.select(df2.a)   <-   illegal reference df2.a
-        throw new AnalysisException(
-          errorClass = "_LEGACY_ERROR_TEMP_3051",
-          messageParameters = Map(
-            "u" -> u.toString,
-            "planId" -> planId.toString,
-            "q" -> q.toString))
+    val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).isDefined
+
+    var result = Option.empty[NamedExpression]
+    var numMatched = 0
+    resolveUnresolvedAttributeByPlanId(u, planId, isMetadataAccess, q).foreach {
+      case Some(resolved) =>
+        numMatched += 1
+        if (result.isEmpty) {
+          result = Some(resolved)
+        } else {
+          throw new AnalysisException(
+            errorClass = "AMBIGUOUS_COLUMN_REFERENCE",
+            messageParameters = Map("name" -> toSQLId(u.nameParts)),
+            origin = u.origin
+          )
+        }
+      case _ => numMatched += 1
+    }
+    if (numMatched == 0) {
+      // For example:
+      //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
+      //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
+      //  df1.select(df2.a)   <-   illegal reference df2.a
+      throw new AnalysisException(
+        errorClass = "_LEGACY_ERROR_TEMP_3051",
+        messageParameters = Map(
+          "u" -> u.toString,
+          "planId" -> planId.toString,
+          "q" -> q.toString))
+    }
+    result
+  }
+
+  private def resolveUnresolvedAttributeByPlanId(
+      u: UnresolvedAttribute,
+      id: Long,
+      isMetadataAccess: Boolean,
+      p: LogicalPlan): Iterator[Option[NamedExpression]] = {
+    if (p.getTagValue(LogicalPlan.PLAN_ID_TAG).contains(id)) {
+      Iterator.single(
+        resolveUnresolvedAttributeByPlan(u, p, isMetadataAccess))
+    } else {
+      p.children.iterator.flatMap { child =>
+        val outputSet = if (isMetadataAccess) {

Review Comment:
   I think `resolveUnresolvedAttributeByPlanId` should always return a valid column. So the code can be further simplified:
   ```
   def resolveUnresolvedAttributeByPlanId(...): Option[NamedExpression] = {
     ...
     val candidates = p.children.flatMap(resolveUnresolvedAttributeByPlanId(...))
       .filter(p.outputSet.contains)
     if (candidates.length > 1) fail ...
     candidates.headOption
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1441726435


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -487,42 +487,76 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
   //       original expression as it is.
   private def tryResolveColumnByPlanId(
       e: Expression,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan] = mutable.HashMap.empty): Expression = e match {
+      q: LogicalPlan): Expression = e match {
     case u: UnresolvedAttribute =>
-      resolveUnresolvedAttributeByPlanId(
-        u, q, idToPlan: mutable.HashMap[Long, LogicalPlan]
-      ).getOrElse(u)
+      resolveUnresolvedAttributeByPlanId(u, q).getOrElse(u)
     case _ if e.containsPattern(UNRESOLVED_ATTRIBUTE) =>
-      e.mapChildren(c => tryResolveColumnByPlanId(c, q, idToPlan))
+      e.mapChildren(c => tryResolveColumnByPlanId(c, q))
     case _ => e
   }
 
   private def resolveUnresolvedAttributeByPlanId(
       u: UnresolvedAttribute,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan]): Option[NamedExpression] = {
+      q: LogicalPlan): Option[NamedExpression] = {
     val planIdOpt = u.getTagValue(LogicalPlan.PLAN_ID_TAG)
     if (planIdOpt.isEmpty) return None
     val planId = planIdOpt.get
     logDebug(s"Extract plan_id $planId from $u")
 
-    val plan = idToPlan.getOrElseUpdate(planId, {
-      findPlanById(u, planId, q).getOrElse {
-        // For example:
-        //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
-        //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
-        //  df1.select(df2.a)   <-   illegal reference df2.a
+    val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).isDefined
+
+    val resolved = resolveUnresolvedAttributeByPlanId(u, planId, isMetadataAccess, q)
+    if (resolved.isEmpty) {
+      // For example:
+      //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
+      //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
+      //  df1.select(df2.a)   <-   illegal reference df2.a
+      throw new AnalysisException(
+        errorClass = "CANNOT_RESOLVE_WITH_PLAN_ID",
+        messageParameters = Map(
+          "expression" -> toSQLId(u.nameParts),
+          "id" -> planId.toString,
+          "plan" -> q.toString
+        ),
+        origin = u.origin
+      )
+    }
+    resolved
+  }
+
+  private def resolveUnresolvedAttributeByPlanId(
+      u: UnresolvedAttribute,
+      id: Long,
+      isMetadataAccess: Boolean,
+      p: LogicalPlan): Option[NamedExpression] = {
+    if (p.getTagValue(LogicalPlan.PLAN_ID_TAG).contains(id)) {
+        resolveUnresolvedAttributeByPlan(u, p, isMetadataAccess)
+    } else {
+      val candidates = p.children.flatMap { child =>
+        val outputSet = if (isMetadataAccess) {
+          // NOTE: A metadata column might appear in `output` instead of `metadataOutput`.
+          child.outputSet ++ AttributeSet(child.metadataOutput)
+        } else {
+          child.outputSet
+        }
+        resolveUnresolvedAttributeByPlanId(u, id, isMetadataAccess, child)

Review Comment:
   We can assume that `resolveUnresolvedAttributeByPlanId` always return valid column, then the code can be simplified
   ```
   val candidates = p.children.flatMap(resolveUnresolvedAttributeByPlanId...)
   if (candidates.length > 1) fail ...
   val outputSet = if (isMetadataAccess) p.outputSet ++ ... else p.outputSet
   candidates.filter(_.references.subsetOf(outputSet))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1443679762


##########
python/pyspark/pandas/frame.py:
##########
@@ -7577,7 +7577,16 @@ def _sort(
             (False, "first"): Column.desc_nulls_first,
             (False, "last"): Column.desc_nulls_last,
         }
-        by = [mapper[(asc, na_position)](scol) for scol, asc in zip(by, ascending)]
+

Review Comment:
   minimum reproducer
   ```
   from pyspark.sql import functions as sf
   df1 = spark.range(10).withColumn("a", sf.lit(0))
   df1.groupby(df1.id + 1).agg(sf.max(df1.a)).sort(df1.id + 1)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1442402530


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -487,42 +487,76 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
   //       original expression as it is.
   private def tryResolveColumnByPlanId(
       e: Expression,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan] = mutable.HashMap.empty): Expression = e match {
+      q: LogicalPlan): Expression = e match {
     case u: UnresolvedAttribute =>
-      resolveUnresolvedAttributeByPlanId(
-        u, q, idToPlan: mutable.HashMap[Long, LogicalPlan]
-      ).getOrElse(u)
+      resolveUnresolvedAttributeByPlanId(u, q).getOrElse(u)
     case _ if e.containsPattern(UNRESOLVED_ATTRIBUTE) =>
-      e.mapChildren(c => tryResolveColumnByPlanId(c, q, idToPlan))
+      e.mapChildren(c => tryResolveColumnByPlanId(c, q))
     case _ => e
   }
 
   private def resolveUnresolvedAttributeByPlanId(
       u: UnresolvedAttribute,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan]): Option[NamedExpression] = {
+      q: LogicalPlan): Option[NamedExpression] = {
     val planIdOpt = u.getTagValue(LogicalPlan.PLAN_ID_TAG)
     if (planIdOpt.isEmpty) return None
     val planId = planIdOpt.get
     logDebug(s"Extract plan_id $planId from $u")
 
-    val plan = idToPlan.getOrElseUpdate(planId, {
-      findPlanById(u, planId, q).getOrElse {
-        // For example:
-        //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
-        //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
-        //  df1.select(df2.a)   <-   illegal reference df2.a
+    val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).isDefined
+
+    val resolved = resolveUnresolvedAttributeByPlanId(u, planId, isMetadataAccess, q)
+    if (resolved.isEmpty) {
+      // For example:
+      //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
+      //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
+      //  df1.select(df2.a)   <-   illegal reference df2.a
+      throw new AnalysisException(
+        errorClass = "CANNOT_RESOLVE_WITH_PLAN_ID",
+        messageParameters = Map(
+          "expression" -> toSQLId(u.nameParts),
+          "id" -> planId.toString,
+          "plan" -> q.toString
+        ),
+        origin = u.origin
+      )
+    }
+    resolved
+  }
+
+  private def resolveUnresolvedAttributeByPlanId(
+      u: UnresolvedAttribute,
+      id: Long,
+      isMetadataAccess: Boolean,
+      p: LogicalPlan): Option[NamedExpression] = {
+    if (p.getTagValue(LogicalPlan.PLAN_ID_TAG).contains(id)) {
+        resolveUnresolvedAttributeByPlan(u, p, isMetadataAccess)
+    } else {
+      val candidates = p.children.flatMap { child =>
+        val outputSet = if (isMetadataAccess) {
+          // NOTE: A metadata column might appear in `output` instead of `metadataOutput`.
+          child.outputSet ++ AttributeSet(child.metadataOutput)
+        } else {
+          child.outputSet
+        }
+        resolveUnresolvedAttributeByPlanId(u, id, isMetadataAccess, child)

Review Comment:
   I think it could be
   ```
   val candidates = p.children.flatMap { child =>
      val candidate = resolveUnresolvedAttributeByPlanId...
      if (candidate.isEmpty) {
         None
      } else {
         val outputSet = ...
         candidate.filter(_.references.subsetOf(outputSet))
      }
   
   
   } 
   
   ```
   
   to only new a `outputSet` for non-empty `candidate`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1439142630


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -520,9 +524,33 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
             "planId" -> planId.toString,
             "q" -> q.toString))
       }
-    })
 
-    val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).isDefined
+      val matched = found.flatMap { case (child, nodes) =>
+        val resolved = nodes
+          .flatMap(resolveUnresolvedAttributeByPlan(u, _, isMetadataAccess))
+        if (isMetadataAccess) {
+          // NOTE: A metadata column might appear in `output` instead of `metadataOutput`.
+          val metadataOutputSet = child.outputSet ++ AttributeSet(child.metadataOutput)
+          resolved.filter(metadataOutputSet.contains)
+        } else {
+          resolved.filter(child.outputSet.contains)
+        }
+      }
+      if (matched.length > 1) {
+        throw new AnalysisException(
+          errorClass = "AMBIGUOUS_COLUMN_REFERENCE",
+          messageParameters = Map("name" -> toSQLId(u.nameParts)),
+          origin = u.origin
+        )
+      }
+      matched.headOption
+    }
+  }
+
+  private def resolveUnresolvedAttributeByPlan(
+    u: UnresolvedAttribute,

Review Comment:
   nit: 4 spaces indentation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1438547075


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -520,9 +524,33 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
             "planId" -> planId.toString,
             "q" -> q.toString))
       }
-    })
 
-    val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).isDefined
+      val matched = found.flatMap { case (child, nodes) =>
+        val resolved = nodes
+          .flatMap(resolveUnresolvedAttributeByPlan(u, _, isMetadataAccess))
+        if (isMetadataAccess) {
+          // NOTE: A metadata column might appear in `output` instead of `metadataOutput`.
+          val metadataOutputSet = child.outputSet ++ AttributeSet(child.metadataOutput)

Review Comment:
   need to filter with output+metadataOutput, otherwise metadata related tests will fail



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -520,9 +524,33 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
             "planId" -> planId.toString,
             "q" -> q.toString))
       }
-    })
 
-    val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).isDefined
+      val matched = found.flatMap { case (child, nodes) =>
+        val resolved = nodes
+          .flatMap(resolveUnresolvedAttributeByPlan(u, _, isMetadataAccess))
+        if (isMetadataAccess) {
+          // NOTE: A metadata column might appear in `output` instead of `metadataOutput`.
+          val metadataOutputSet = child.outputSet ++ AttributeSet(child.metadataOutput)

Review Comment:
   need to filter with `output+metadataOutput`, otherwise metadata related tests will fail



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #44532:
URL: https://github.com/apache/spark/pull/44532#issuecomment-1872499783

   cc @cloud-fan I think it is ready for the initial review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1442741533


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -487,42 +487,69 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
   //       original expression as it is.
   private def tryResolveColumnByPlanId(
       e: Expression,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan] = mutable.HashMap.empty): Expression = e match {
+      q: Seq[LogicalPlan]): Expression = e match {
     case u: UnresolvedAttribute =>
-      resolveUnresolvedAttributeByPlanId(
-        u, q, idToPlan: mutable.HashMap[Long, LogicalPlan]
-      ).getOrElse(u)
+      u.getTagValue(LogicalPlan.PLAN_ID_TAG) match {
+        case Some(id) =>
+          resolveUnresolvedAttributeByPlanId(u, id, q)
+        case _ => u
+      }
     case _ if e.containsPattern(UNRESOLVED_ATTRIBUTE) =>
-      e.mapChildren(c => tryResolveColumnByPlanId(c, q, idToPlan))
+      e.mapChildren(c => tryResolveColumnByPlanId(c, q))
     case _ => e
   }
 
   private def resolveUnresolvedAttributeByPlanId(

Review Comment:
   We don't need this method if the caller of it can get the `isMetadataAccess` flag first.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -487,42 +487,69 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
   //       original expression as it is.
   private def tryResolveColumnByPlanId(
       e: Expression,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan] = mutable.HashMap.empty): Expression = e match {
+      q: Seq[LogicalPlan]): Expression = e match {
     case u: UnresolvedAttribute =>
-      resolveUnresolvedAttributeByPlanId(
-        u, q, idToPlan: mutable.HashMap[Long, LogicalPlan]
-      ).getOrElse(u)
+      u.getTagValue(LogicalPlan.PLAN_ID_TAG) match {
+        case Some(id) =>
+          resolveUnresolvedAttributeByPlanId(u, id, q)
+        case _ => u
+      }
     case _ if e.containsPattern(UNRESOLVED_ATTRIBUTE) =>
-      e.mapChildren(c => tryResolveColumnByPlanId(c, q, idToPlan))
+      e.mapChildren(c => tryResolveColumnByPlanId(c, q))
     case _ => e
   }
 
   private def resolveUnresolvedAttributeByPlanId(

Review Comment:
   We don't need this method if the caller of it can get the `isMetadataAccess` flag first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1442742244


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -487,42 +487,69 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
   //       original expression as it is.
   private def tryResolveColumnByPlanId(
       e: Expression,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan] = mutable.HashMap.empty): Expression = e match {
+      q: Seq[LogicalPlan]): Expression = e match {
     case u: UnresolvedAttribute =>
-      resolveUnresolvedAttributeByPlanId(
-        u, q, idToPlan: mutable.HashMap[Long, LogicalPlan]
-      ).getOrElse(u)
+      u.getTagValue(LogicalPlan.PLAN_ID_TAG) match {

Review Comment:
   why don't we get the plan id inside `resolveUnresolvedAttributeByPlanId`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1440104963


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -487,42 +487,81 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
   //       original expression as it is.
   private def tryResolveColumnByPlanId(
       e: Expression,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan] = mutable.HashMap.empty): Expression = e match {
+      q: LogicalPlan): Expression = e match {
     case u: UnresolvedAttribute =>
-      resolveUnresolvedAttributeByPlanId(
-        u, q, idToPlan: mutable.HashMap[Long, LogicalPlan]
-      ).getOrElse(u)
+      resolveUnresolvedAttributeByPlanId(u, q).getOrElse(u)
     case _ if e.containsPattern(UNRESOLVED_ATTRIBUTE) =>
-      e.mapChildren(c => tryResolveColumnByPlanId(c, q, idToPlan))
+      e.mapChildren(c => tryResolveColumnByPlanId(c, q))
     case _ => e
   }
 
   private def resolveUnresolvedAttributeByPlanId(
       u: UnresolvedAttribute,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan]): Option[NamedExpression] = {
+      q: LogicalPlan): Option[NamedExpression] = {
     val planIdOpt = u.getTagValue(LogicalPlan.PLAN_ID_TAG)
     if (planIdOpt.isEmpty) return None
     val planId = planIdOpt.get
     logDebug(s"Extract plan_id $planId from $u")
 
-    val plan = idToPlan.getOrElseUpdate(planId, {
-      findPlanById(u, planId, q).getOrElse {
-        // For example:
-        //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
-        //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
-        //  df1.select(df2.a)   <-   illegal reference df2.a
-        throw new AnalysisException(
-          errorClass = "_LEGACY_ERROR_TEMP_3051",
-          messageParameters = Map(
-            "u" -> u.toString,
-            "planId" -> planId.toString,
-            "q" -> q.toString))
+    val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).isDefined
+
+    var result = Option.empty[NamedExpression]
+    var numMatched = 0
+    resolveUnresolvedAttributeByPlanId(u, planId, isMetadataAccess, q).foreach {
+      case Some(resolved) =>
+        numMatched += 1
+        if (result.isEmpty) {
+          result = Some(resolved)
+        } else {
+          throw new AnalysisException(
+            errorClass = "AMBIGUOUS_COLUMN_REFERENCE",
+            messageParameters = Map("name" -> toSQLId(u.nameParts)),
+            origin = u.origin
+          )
+        }
+      case _ => numMatched += 1
+    }
+    if (numMatched == 0) {
+      // For example:
+      //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
+      //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
+      //  df1.select(df2.a)   <-   illegal reference df2.a
+      throw new AnalysisException(
+        errorClass = "_LEGACY_ERROR_TEMP_3051",
+        messageParameters = Map(
+          "u" -> u.toString,
+          "planId" -> planId.toString,
+          "q" -> q.toString))
+    }
+    result
+  }
+
+  private def resolveUnresolvedAttributeByPlanId(
+      u: UnresolvedAttribute,
+      id: Long,
+      isMetadataAccess: Boolean,
+      p: LogicalPlan): Iterator[Option[NamedExpression]] = {
+    if (p.getTagValue(LogicalPlan.PLAN_ID_TAG).contains(id)) {
+      Iterator.single(
+        resolveUnresolvedAttributeByPlan(u, p, isMetadataAccess))
+    } else {
+      p.children.iterator.flatMap { child =>
+        val outputSet = if (isMetadataAccess) {

Review Comment:
   This is more efficient as plans like `Project` can prune the resolved column earlier. The outer `resolveUnresolvedAttributeByPlanId` should fail if there is no column matched.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1439143757


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -520,9 +524,33 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
             "planId" -> planId.toString,
             "q" -> q.toString))
       }
-    })
 
-    val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).isDefined
+      val matched = found.flatMap { case (child, nodes) =>
+        val resolved = nodes
+          .flatMap(resolveUnresolvedAttributeByPlan(u, _, isMetadataAccess))
+        if (isMetadataAccess) {
+          // NOTE: A metadata column might appear in `output` instead of `metadataOutput`.
+          val metadataOutputSet = child.outputSet ++ AttributeSet(child.metadataOutput)
+          resolved.filter(metadataOutputSet.contains)
+        } else {
+          resolved.filter(child.outputSet.contains)

Review Comment:
   `resolveUnresolvedAttributeByPlanId` is not recursive, which means we only do this pruning once at the very end. Shall we make `resolveUnresolvedAttributeByPlanId` recursive to deal with cases that have many joins in one query?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1439143838


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -520,9 +524,33 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
             "planId" -> planId.toString,
             "q" -> q.toString))
       }
-    })
 
-    val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).isDefined
+      val matched = found.flatMap { case (child, nodes) =>
+        val resolved = nodes
+          .flatMap(resolveUnresolvedAttributeByPlan(u, _, isMetadataAccess))
+        if (isMetadataAccess) {
+          // NOTE: A metadata column might appear in `output` instead of `metadataOutput`.
+          val metadataOutputSet = child.outputSet ++ AttributeSet(child.metadataOutput)
+          resolved.filter(metadataOutputSet.contains)
+        } else {
+          resolved.filter(child.outputSet.contains)

Review Comment:
   The goal is to do this pruning as early as possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1445668556


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -487,42 +487,81 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
   //       original expression as it is.
   private def tryResolveColumnByPlanId(
       e: Expression,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan] = mutable.HashMap.empty): Expression = e match {
+      q: Seq[LogicalPlan]): Expression = e match {
     case u: UnresolvedAttribute =>
-      resolveUnresolvedAttributeByPlanId(
-        u, q, idToPlan: mutable.HashMap[Long, LogicalPlan]
-      ).getOrElse(u)
+      resolveUnresolvedAttributeByPlanId(u, q).getOrElse(u)
     case _ if e.containsPattern(UNRESOLVED_ATTRIBUTE) =>
-      e.mapChildren(c => tryResolveColumnByPlanId(c, q, idToPlan))
+      e.mapChildren(c => tryResolveColumnByPlanId(c, q))
     case _ => e
   }
 
   private def resolveUnresolvedAttributeByPlanId(
       u: UnresolvedAttribute,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan]): Option[NamedExpression] = {
+      q: Seq[LogicalPlan]): Option[NamedExpression] = {
     val planIdOpt = u.getTagValue(LogicalPlan.PLAN_ID_TAG)
     if (planIdOpt.isEmpty) return None
     val planId = planIdOpt.get
     logDebug(s"Extract plan_id $planId from $u")
 
-    val plan = idToPlan.getOrElseUpdate(planId, {
-      findPlanById(u, planId, q).getOrElse {
-        // For example:
-        //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
-        //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
-        //  df1.select(df2.a)   <-   illegal reference df2.a
-        throw new AnalysisException(
-          errorClass = "_LEGACY_ERROR_TEMP_3051",
-          messageParameters = Map(
-            "u" -> u.toString,
-            "planId" -> planId.toString,
-            "q" -> q.toString))
+    val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).nonEmpty
+    val (resolved, matched) = resolveByPlanId(u, planId, isMetadataAccess, q)
+
+    if (!matched) {
+      // Can not find the target plan node with plan id, e.g.
+      //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
+      //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
+      //  df1.select(df2.a)   <-   illegal reference df2.a
+      throw QueryCompilationErrors.cannotResolveColumn(u)
+    }
+    resolved
+  }
+
+  private def resolveByPlanId(
+      u: UnresolvedAttribute,
+      id: Long,
+      isMetadataAccess: Boolean,
+      q: Seq[LogicalPlan]): (Option[NamedExpression], Boolean) = {
+    q.iterator.map(resolveByPlanId(u, id, isMetadataAccess, _))
+      .foldLeft((Option.empty[NamedExpression], false)) {
+        case ((r1, m1), (r2, m2)) =>
+          if (r1.nonEmpty && r2.nonEmpty) {
+            throw QueryCompilationErrors.ambiguousColumnReferences(u)
+          }
+          (if (r1.nonEmpty) r1 else r2, m1 | m2)
       }
-    })
+  }
+
+  private def resolveByPlanId(
+      u: UnresolvedAttribute,
+      id: Long,
+      isMetadataAccess: Boolean,
+      p: LogicalPlan): (Option[NamedExpression], Boolean) = {
+    val (resolved, matched) = if (p.getTagValue(LogicalPlan.PLAN_ID_TAG).contains(id)) {
+      (resolveByPlan(u, p, isMetadataAccess), true)
+    } else {
+      resolveByPlanId(u, id, isMetadataAccess, p.children)
+    }
 
-    val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).isDefined
+    // Even with the target plan node, resolveUnresolvedAttributeByPlanId still
+    // can not guarantee successfully resolving u:
+    // there are several rules supporting missing column resolution
+    // (e.g. ResolveReferencesInSort), but the resolved attribute maybe filtered
+    // out by the output attribute set.
+    // In this case, fall back to column resolution without plan id.
+    val filtered = resolved.filter { r =>
+      if (isMetadataAccess) {
+        r.references.subsetOf(AttributeSet(p.output ++ p.metadataOutput))
+      } else {
+        r.references.subsetOf(p.outputSet)
+      }
+    }
+    (filtered, matched)
+  }
+
+  private def resolveByPlan(

Review Comment:
   This function is called only in one place, we can inline it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1445668047


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -487,42 +487,81 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
   //       original expression as it is.
   private def tryResolveColumnByPlanId(

Review Comment:
   let's re-think about the naming. I think this one can be `tryResolveDataFrameColumns`, then it calls `resolveDataFrameColumn`, which then calls `resolveDataFrameColumnByPlanId`, which then calls `resolveDataFrameColumnRecursively`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1444298258


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -487,42 +487,83 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
   //       original expression as it is.
   private def tryResolveColumnByPlanId(
       e: Expression,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan] = mutable.HashMap.empty): Expression = e match {
+      q: Seq[LogicalPlan]): Expression = e match {
     case u: UnresolvedAttribute =>
-      resolveUnresolvedAttributeByPlanId(
-        u, q, idToPlan: mutable.HashMap[Long, LogicalPlan]
-      ).getOrElse(u)
+      resolveUnresolvedAttributeByPlanId(u, q).getOrElse(u)
     case _ if e.containsPattern(UNRESOLVED_ATTRIBUTE) =>
-      e.mapChildren(c => tryResolveColumnByPlanId(c, q, idToPlan))
+      e.mapChildren(c => tryResolveColumnByPlanId(c, q))
     case _ => e
   }
 
   private def resolveUnresolvedAttributeByPlanId(
       u: UnresolvedAttribute,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan]): Option[NamedExpression] = {
+      q: Seq[LogicalPlan]): Option[NamedExpression] = {
     val planIdOpt = u.getTagValue(LogicalPlan.PLAN_ID_TAG)
     if (planIdOpt.isEmpty) return None
     val planId = planIdOpt.get
     logDebug(s"Extract plan_id $planId from $u")
 
-    val plan = idToPlan.getOrElseUpdate(planId, {
-      findPlanById(u, planId, q).getOrElse {
-        // For example:
-        //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
-        //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
-        //  df1.select(df2.a)   <-   illegal reference df2.a
-        throw new AnalysisException(
-          errorClass = "_LEGACY_ERROR_TEMP_3051",
-          messageParameters = Map(
-            "u" -> u.toString,
-            "planId" -> planId.toString,
-            "q" -> q.toString))
+    val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).isDefined
+
+    val (resolved, matched) =
+      q.iterator.map(resolveUnresolvedAttributeByPlanId(u, planId, isMetadataAccess, _))
+        .foldLeft[(Option[NamedExpression], Boolean)]((None, false)) {
+          case ((r1, m1), (r2, m2)) =>
+            if (r1.nonEmpty && r2.nonEmpty) {
+              throw QueryCompilationErrors.ambiguousColumnReferences(u)
+            }
+            (if (r1.isEmpty) r2 else r1, m1 | m2)
+        }
+
+    if (!matched) {
+      // Can not find the target plan node with plan id, e.g.
+      //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
+      //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
+      //  df1.select(df2.a)   <-   illegal reference df2.a
+      throw QueryCompilationErrors.cannotResolveColumn(u)
+    }
+
+    // Even with the target plan node, resolveUnresolvedAttributeByPlanId still
+    // can not guarantee successfully resolve u:
+    // this method is invoked in rules which support missing column resolution
+    // (e.g. ResolveReferencesInSort), then the resolved attribute maybe filtered
+    // out by the output attribute set.
+    // In this case, fall back to column resolution without plan id.
+    resolved
+  }
+
+  private def resolveUnresolvedAttributeByPlanId(
+      u: UnresolvedAttribute,
+      id: Long,
+      isMetadataAccess: Boolean,
+      p: LogicalPlan): (Option[NamedExpression], Boolean) = {
+    val (resolved, matched) = if (p.getTagValue(LogicalPlan.PLAN_ID_TAG).contains(id)) {
+      (resolveUnresolvedAttributeByPlan(u, p, isMetadataAccess), true)
+    } else {
+      p.children.iterator.map(resolveUnresolvedAttributeByPlanId(u, id, isMetadataAccess, _))

Review Comment:
   shall we add a method for it to save duplicated code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1445748444


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -485,80 +485,86 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
   //    4. if more than one matching nodes are found, fail due to ambiguous column reference;
   //    5. resolve the expression with the matching node, if any error occurs here, return the
   //       original expression as it is.
-  private def tryResolveColumnByPlanId(
+  private def tryResolveDataFrameColumns(
       e: Expression,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan] = mutable.HashMap.empty): Expression = e match {
+      q: Seq[LogicalPlan]): Expression = e match {
     case u: UnresolvedAttribute =>
-      resolveUnresolvedAttributeByPlanId(
-        u, q, idToPlan: mutable.HashMap[Long, LogicalPlan]
-      ).getOrElse(u)
+      resolveDataFrameColumn(u, q).getOrElse(u)
     case _ if e.containsPattern(UNRESOLVED_ATTRIBUTE) =>
-      e.mapChildren(c => tryResolveColumnByPlanId(c, q, idToPlan))
+      e.mapChildren(c => tryResolveDataFrameColumns(c, q))
     case _ => e
   }
 
-  private def resolveUnresolvedAttributeByPlanId(
+  private def resolveDataFrameColumn(
       u: UnresolvedAttribute,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan]): Option[NamedExpression] = {
+      q: Seq[LogicalPlan]): Option[NamedExpression] = {
     val planIdOpt = u.getTagValue(LogicalPlan.PLAN_ID_TAG)
     if (planIdOpt.isEmpty) return None
     val planId = planIdOpt.get
     logDebug(s"Extract plan_id $planId from $u")
 
-    val plan = idToPlan.getOrElseUpdate(planId, {
-      findPlanById(u, planId, q).getOrElse {
-        // For example:
-        //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
-        //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
-        //  df1.select(df2.a)   <-   illegal reference df2.a
-        throw new AnalysisException(
-          errorClass = "_LEGACY_ERROR_TEMP_3051",
-          messageParameters = Map(
-            "u" -> u.toString,
-            "planId" -> planId.toString,
-            "q" -> q.toString))
-      }
-    })
+    val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).nonEmpty
+    val (resolved, matched) = resolveDataFrameColumnByPlanId(u, planId, isMetadataAccess, q)
+    if (!matched) {
+      // Can not find the target plan node with plan id, e.g.
+      //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
+      //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
+      //  df1.select(df2.a)   <-   illegal reference df2.a
+      throw QueryCompilationErrors.cannotResolveColumn(u)
+    }
+    resolved
+  }
 
-    val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).isDefined
-    try {
-      if (!isMetadataAccess) {
-        plan.resolve(u.nameParts, conf.resolver)
-      } else if (u.nameParts.size == 1) {
-        plan.getMetadataAttributeByNameOpt(u.nameParts.head)
-      } else {
-        None
+  private def resolveDataFrameColumnByPlanId(
+      u: UnresolvedAttribute,
+      id: Long,
+      isMetadataAccess: Boolean,
+      q: Seq[LogicalPlan]): (Option[NamedExpression], Boolean) = {
+    q.iterator.map(resolveDataFrameColumnRecursively(u, id, isMetadataAccess, _))
+      .foldLeft((Option.empty[NamedExpression], false)) {
+        case ((r1, m1), (r2, m2)) =>
+          if (r1.nonEmpty && r2.nonEmpty) {
+            throw QueryCompilationErrors.ambiguousColumnReferences(u)
+          }
+          (if (r1.nonEmpty) r1 else r2, m1 | m2)
       }
-    } catch {
-      case e: AnalysisException =>
-        logDebug(s"Fail to resolve $u with $plan due to $e")
-        None
-    }
   }
 
-  private def findPlanById(
+  private def resolveDataFrameColumnRecursively(
       u: UnresolvedAttribute,
       id: Long,
-      plan: LogicalPlan): Option[LogicalPlan] = {
-    if (plan.getTagValue(LogicalPlan.PLAN_ID_TAG).contains(id)) {
-      Some(plan)
-    } else if (plan.children.length == 1) {
-      findPlanById(u, id, plan.children.head)
-    } else if (plan.children.length > 1) {
-      val matched = plan.children.flatMap(findPlanById(u, id, _))
-      if (matched.length > 1) {
-        throw new AnalysisException(
-          errorClass = "AMBIGUOUS_COLUMN_REFERENCE",
-          messageParameters = Map("name" -> toSQLId(u.nameParts)),
-          origin = u.origin
-        )
-      } else {
-        matched.headOption
+      isMetadataAccess: Boolean,
+      p: LogicalPlan): (Option[NamedExpression], Boolean) = {
+    val (resolved, matched) = if (p.getTagValue(LogicalPlan.PLAN_ID_TAG).contains(id)) {
+      var resolved = Option.empty[NamedExpression]
+      try {
+        if (!isMetadataAccess) {
+          resolved = p.resolve(u.nameParts, conf.resolver)
+        } else if (u.nameParts.size == 1) {
+          resolved = p.getMetadataAttributeByNameOpt(u.nameParts.head)
+        }
+      } catch {
+        case e: AnalysisException =>
+          logDebug(s"Fail to resolve $u with $p due to $e")
       }
+      (resolved, true)
     } else {
-      None
+      resolveDataFrameColumnByPlanId(u, id, isMetadataAccess, p.children)
+    }
+
+    // Even with the target plan node, tryResolveDataFrameColumns still
+    // can not guarantee successfully resolving u:
+    // there are several rules supporting missing column resolution
+    // (e.g. ResolveReferencesInSort), but the resolved attribute maybe filtered
+    // out by the output attribute set.
+    // In this case, fall back to column resolution without plan id.
+    val filtered = resolved.filter { r =>

Review Comment:
   I think the comment here should explain why we filter the resolved columns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng closed pull request #44532: [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join
URL: https://github.com/apache/spark/pull/44532


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1441712533


##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -330,6 +330,12 @@
     ],
     "sqlState" : "42704"
   },
+  "CANNOT_RESOLVE_WITH_PLAN_ID" : {

Review Comment:
   ```suggestion
     "CANNOT_RESOLVE_COLUMN_WITH_PLAN_ID" : {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1441724194


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -487,42 +487,76 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
   //       original expression as it is.
   private def tryResolveColumnByPlanId(
       e: Expression,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan] = mutable.HashMap.empty): Expression = e match {
+      q: LogicalPlan): Expression = e match {
     case u: UnresolvedAttribute =>
-      resolveUnresolvedAttributeByPlanId(
-        u, q, idToPlan: mutable.HashMap[Long, LogicalPlan]
-      ).getOrElse(u)
+      resolveUnresolvedAttributeByPlanId(u, q).getOrElse(u)
     case _ if e.containsPattern(UNRESOLVED_ATTRIBUTE) =>
-      e.mapChildren(c => tryResolveColumnByPlanId(c, q, idToPlan))
+      e.mapChildren(c => tryResolveColumnByPlanId(c, q))
     case _ => e
   }
 
   private def resolveUnresolvedAttributeByPlanId(
       u: UnresolvedAttribute,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan]): Option[NamedExpression] = {
+      q: LogicalPlan): Option[NamedExpression] = {
     val planIdOpt = u.getTagValue(LogicalPlan.PLAN_ID_TAG)
     if (planIdOpt.isEmpty) return None
     val planId = planIdOpt.get
     logDebug(s"Extract plan_id $planId from $u")
 
-    val plan = idToPlan.getOrElseUpdate(planId, {
-      findPlanById(u, planId, q).getOrElse {
-        // For example:
-        //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
-        //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
-        //  df1.select(df2.a)   <-   illegal reference df2.a
+    val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).isDefined
+
+    val resolved = resolveUnresolvedAttributeByPlanId(u, planId, isMetadataAccess, q)
+    if (resolved.isEmpty) {
+      // For example:
+      //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
+      //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
+      //  df1.select(df2.a)   <-   illegal reference df2.a
+      throw new AnalysisException(
+        errorClass = "CANNOT_RESOLVE_WITH_PLAN_ID",
+        messageParameters = Map(
+          "expression" -> toSQLId(u.nameParts),
+          "id" -> planId.toString,
+          "plan" -> q.toString
+        ),
+        origin = u.origin
+      )
+    }
+    resolved
+  }
+
+  private def resolveUnresolvedAttributeByPlanId(
+      u: UnresolvedAttribute,
+      id: Long,
+      isMetadataAccess: Boolean,
+      p: LogicalPlan): Option[NamedExpression] = {
+    if (p.getTagValue(LogicalPlan.PLAN_ID_TAG).contains(id)) {
+        resolveUnresolvedAttributeByPlan(u, p, isMetadataAccess)

Review Comment:
   ```suggestion
         resolveUnresolvedAttributeByPlan(u, p, isMetadataAccess)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1442402530


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -487,42 +487,76 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
   //       original expression as it is.
   private def tryResolveColumnByPlanId(
       e: Expression,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan] = mutable.HashMap.empty): Expression = e match {
+      q: LogicalPlan): Expression = e match {
     case u: UnresolvedAttribute =>
-      resolveUnresolvedAttributeByPlanId(
-        u, q, idToPlan: mutable.HashMap[Long, LogicalPlan]
-      ).getOrElse(u)
+      resolveUnresolvedAttributeByPlanId(u, q).getOrElse(u)
     case _ if e.containsPattern(UNRESOLVED_ATTRIBUTE) =>
-      e.mapChildren(c => tryResolveColumnByPlanId(c, q, idToPlan))
+      e.mapChildren(c => tryResolveColumnByPlanId(c, q))
     case _ => e
   }
 
   private def resolveUnresolvedAttributeByPlanId(
       u: UnresolvedAttribute,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan]): Option[NamedExpression] = {
+      q: LogicalPlan): Option[NamedExpression] = {
     val planIdOpt = u.getTagValue(LogicalPlan.PLAN_ID_TAG)
     if (planIdOpt.isEmpty) return None
     val planId = planIdOpt.get
     logDebug(s"Extract plan_id $planId from $u")
 
-    val plan = idToPlan.getOrElseUpdate(planId, {
-      findPlanById(u, planId, q).getOrElse {
-        // For example:
-        //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
-        //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
-        //  df1.select(df2.a)   <-   illegal reference df2.a
+    val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).isDefined
+
+    val resolved = resolveUnresolvedAttributeByPlanId(u, planId, isMetadataAccess, q)
+    if (resolved.isEmpty) {
+      // For example:
+      //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
+      //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
+      //  df1.select(df2.a)   <-   illegal reference df2.a
+      throw new AnalysisException(
+        errorClass = "CANNOT_RESOLVE_WITH_PLAN_ID",
+        messageParameters = Map(
+          "expression" -> toSQLId(u.nameParts),
+          "id" -> planId.toString,
+          "plan" -> q.toString
+        ),
+        origin = u.origin
+      )
+    }
+    resolved
+  }
+
+  private def resolveUnresolvedAttributeByPlanId(
+      u: UnresolvedAttribute,
+      id: Long,
+      isMetadataAccess: Boolean,
+      p: LogicalPlan): Option[NamedExpression] = {
+    if (p.getTagValue(LogicalPlan.PLAN_ID_TAG).contains(id)) {
+        resolveUnresolvedAttributeByPlan(u, p, isMetadataAccess)
+    } else {
+      val candidates = p.children.flatMap { child =>
+        val outputSet = if (isMetadataAccess) {
+          // NOTE: A metadata column might appear in `output` instead of `metadataOutput`.
+          child.outputSet ++ AttributeSet(child.metadataOutput)
+        } else {
+          child.outputSet
+        }
+        resolveUnresolvedAttributeByPlanId(u, id, isMetadataAccess, child)

Review Comment:
   I think it could be
   ```
   val candidates = p.children.flatMap(resolveUnresolvedAttributeByPlanId...)
   if (candidates.isEmpty) {
     None
   } else {
     val outputSet = ...
     candidates.filter(_.references.subsetOf(outputSet))
   }
   ```
   
   to only new a `outputSet` for non-empty candidates



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1444297166


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -487,42 +487,83 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
   //       original expression as it is.
   private def tryResolveColumnByPlanId(
       e: Expression,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan] = mutable.HashMap.empty): Expression = e match {
+      q: Seq[LogicalPlan]): Expression = e match {
     case u: UnresolvedAttribute =>
-      resolveUnresolvedAttributeByPlanId(
-        u, q, idToPlan: mutable.HashMap[Long, LogicalPlan]
-      ).getOrElse(u)
+      resolveUnresolvedAttributeByPlanId(u, q).getOrElse(u)
     case _ if e.containsPattern(UNRESOLVED_ATTRIBUTE) =>
-      e.mapChildren(c => tryResolveColumnByPlanId(c, q, idToPlan))
+      e.mapChildren(c => tryResolveColumnByPlanId(c, q))
     case _ => e
   }
 
   private def resolveUnresolvedAttributeByPlanId(
       u: UnresolvedAttribute,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan]): Option[NamedExpression] = {
+      q: Seq[LogicalPlan]): Option[NamedExpression] = {
     val planIdOpt = u.getTagValue(LogicalPlan.PLAN_ID_TAG)
     if (planIdOpt.isEmpty) return None
     val planId = planIdOpt.get
     logDebug(s"Extract plan_id $planId from $u")
 
-    val plan = idToPlan.getOrElseUpdate(planId, {
-      findPlanById(u, planId, q).getOrElse {
-        // For example:
-        //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
-        //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
-        //  df1.select(df2.a)   <-   illegal reference df2.a
-        throw new AnalysisException(
-          errorClass = "_LEGACY_ERROR_TEMP_3051",
-          messageParameters = Map(
-            "u" -> u.toString,
-            "planId" -> planId.toString,
-            "q" -> q.toString))
+    val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).isDefined
+
+    val (resolved, matched) =
+      q.iterator.map(resolveUnresolvedAttributeByPlanId(u, planId, isMetadataAccess, _))
+        .foldLeft[(Option[NamedExpression], Boolean)]((None, false)) {
+          case ((r1, m1), (r2, m2)) =>
+            if (r1.nonEmpty && r2.nonEmpty) {
+              throw QueryCompilationErrors.ambiguousColumnReferences(u)
+            }
+            (if (r1.isEmpty) r2 else r1, m1 | m2)
+        }
+
+    if (!matched) {
+      // Can not find the target plan node with plan id, e.g.
+      //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
+      //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
+      //  df1.select(df2.a)   <-   illegal reference df2.a
+      throw QueryCompilationErrors.cannotResolveColumn(u)
+    }
+
+    // Even with the target plan node, resolveUnresolvedAttributeByPlanId still
+    // can not guarantee successfully resolve u:
+    // this method is invoked in rules which support missing column resolution
+    // (e.g. ResolveReferencesInSort), then the resolved attribute maybe filtered
+    // out by the output attribute set.
+    // In this case, fall back to column resolution without plan id.
+    resolved

Review Comment:
   This doesn't seem like the proper place to put this comment. I think we should add the comments to where we filter the resolved columns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1441368306


##########
python/pyspark/pandas/base.py:
##########
@@ -1361,10 +1361,14 @@ def value_counts(
                 # If even one StructField is null, that row should be dropped.
                 index_spark_column_names = self._internal.index_spark_column_names
                 spark_column = self.spark.column

Review Comment:
   a minimum reproducer is
   ```
   In [16]: from pyspark.sql import functions as sf
   
   In [17]: df1 = spark.range(10).withColumn("a", sf.lit(0))
   
   In [18]: c = sf.struct(df1["id"], df1["a"]).alias("s")
   
   In [19]: df2 = df1.select(c)
   
   In [20]: df2.where(c['id'] > 0)
   Out[20]: 24/01/04 13:53:41 ERROR ErrorUtils: Spark Connect RPC error during: analyze. UserId: ruifeng.zheng. SessionId: ac766649-8114-42ab-8bbb-f0f358b4a717.
   org.apache.spark.sql.AnalysisException: [CANNOT_RESOLVE_WITH_PLAN_ID] Cannot resolve `id` with plan id 21 in plan
   
   '[25]Filter '`>`('struct('id, 'a) AS s#299[id], 0)
   +- [24]Project [struct(id, id#292L, a, a#295) AS s#298]
      +- [21]Project [id#292L, 0 AS a#295]
         +- Range (0, 10, step=1, splits=Some(12))
   . It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1441368306


##########
python/pyspark/pandas/base.py:
##########
@@ -1361,10 +1361,14 @@ def value_counts(
                 # If even one StructField is null, that row should be dropped.
                 index_spark_column_names = self._internal.index_spark_column_names
                 spark_column = self.spark.column

Review Comment:
   a minimum reproducer is
   ```
   In [16]: from pyspark.sql import functions as sf
   
   In [17]: df1 = spark.range(10).withColumn("a", sf.lit(0))
   
   In [18]: c = sf.struct(df1["id"], df1["a"]).alias("s")
   
   In [19]: df2 = df1.select(c)
   
   In [20]: df2.where(c['id'] > 0)
   Out[20]: 24/01/04 13:53:41 ERROR ErrorUtils: Spark Connect RPC error during: analyze. UserId: ruifeng.zheng. SessionId: ac766649-8114-42ab-8bbb-f0f358b4a717.
   org.apache.spark.sql.AnalysisException: [CANNOT_RESOLVE_WITH_PLAN_ID] Cannot resolve `id` with plan id 21 in plan '[25]Filter '`>`('struct('id, 'a) AS s#299[id], 0)
   +- [24]Project [struct(id, id#292L, a, a#295) AS s#298]
      +- [21]Project [id#292L, 0 AS a#295]
         +- Range (0, 10, step=1, splits=Some(12))
   . It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1442798403


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -487,42 +487,69 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
   //       original expression as it is.
   private def tryResolveColumnByPlanId(
       e: Expression,
-      q: LogicalPlan,
-      idToPlan: mutable.HashMap[Long, LogicalPlan] = mutable.HashMap.empty): Expression = e match {
+      q: Seq[LogicalPlan]): Expression = e match {
     case u: UnresolvedAttribute =>
-      resolveUnresolvedAttributeByPlanId(
-        u, q, idToPlan: mutable.HashMap[Long, LogicalPlan]
-      ).getOrElse(u)
+      u.getTagValue(LogicalPlan.PLAN_ID_TAG) match {

Review Comment:
   not a big deal



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #44532:
URL: https://github.com/apache/spark/pull/44532#issuecomment-1884099314

   thanks @cloud-fan for guide and review
   
   merged to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46541][SQL][CONNECT] Fix the ambiguous column reference in self join [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44532:
URL: https://github.com/apache/spark/pull/44532#discussion_r1439441456


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -520,9 +532,41 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
             "planId" -> planId.toString,
             "q" -> q.toString))
       }
-    })
+      if (resolved.length > 1) {
+        throw new AnalysisException(
+          errorClass = "AMBIGUOUS_COLUMN_REFERENCE",
+          messageParameters = Map("name" -> toSQLId(u.nameParts)),
+          origin = u.origin
+        )
+      }
+      resolved.headOption
+    }
+  }
 
-    val isMetadataAccess = u.getTagValue(LogicalPlan.IS_METADATA_COL).isDefined
+  private def resolveUnresolvedAttributeByPlanId(

Review Comment:
   we should merge this with the other `resolveUnresolvedAttributeByPlanId`. The overall algorithm should be
   1. bottom-up traverse the plan tree to find the matching df plan.
   2. resolve the column and propogate it up
   3. during propogation, prune the resolved columns with plan's output. It should happen for each plan node, not just the top plan node's children.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org