You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/01/22 11:12:55 UTC

[spark] branch branch-3.1 updated: [SPARK-34200][SQL] Ambiguous column reference should consider attribute availability

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 2a89a8e  [SPARK-34200][SQL] Ambiguous column reference should consider attribute availability
2a89a8e is described below

commit 2a89a8e84135fcd69398606083d513329a3396f2
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Fri Jan 22 20:11:53 2021 +0900

    [SPARK-34200][SQL] Ambiguous column reference should consider attribute availability
    
    ### What changes were proposed in this pull request?
    
    This is a long-standing bug that exists since we have the ambiguous self-join check. A column reference is not ambiguous if it can only come from one join side (e.g. the other side has a project to only pick a few columns). An example is
    ```
    Join(b#1 = 3)
      TableScan(t, [a#0, b#1])
      Project(a#2)
        TableScan(t, [a#2, b#3])
    ```
    It's a self-join, but `b#1` is not ambiguous because it can't come from the right side, which only has column `a`.
    
    ### Why are the changes needed?
    
    to not fail valid self-join queries.
    
    ### Does this PR introduce _any_ user-facing change?
    
    yea as a bug fix
    
    ### How was this patch tested?
    
    a new test
    
    Closes #31287 from cloud-fan/self-join.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit b8a69066271e82f146bbf6cd5638c544e49bb27f)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../sql/execution/analysis/DetectAmbiguousSelfJoin.scala    | 13 +++++++++++--
 .../scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala |  9 +++++++++
 2 files changed, 20 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala
index ef657ba..b26a078 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.analysis
 import scala.collection.mutable
 
 import org.apache.spark.sql.{AnalysisException, Column, Dataset}
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, Equality, Expression, ExprId}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Cast, Equality, Expression, ExprId}
 import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
@@ -86,6 +86,7 @@ object DetectAmbiguousSelfJoin extends Rule[LogicalPlan] {
       val colRefs = colRefAttrs.map(toColumnReference).distinct
       val ambiguousColRefs = mutable.HashSet.empty[ColumnReference]
       val dsIdSet = colRefs.map(_.datasetId).toSet
+      val inputAttrs = AttributeSet(plan.children.flatMap(_.output))
 
       plan.foreach {
         case LogicalPlanWithDatasetId(p, id) if dsIdSet.contains(id) =>
@@ -101,7 +102,15 @@ object DetectAmbiguousSelfJoin extends Rule[LogicalPlan] {
                 // the attribute of column reference, then the column reference is ambiguous, as it
                 // refers to a column that gets regenerated by self-join.
                 val actualAttr = p.output(ref.colPos).asInstanceOf[AttributeReference]
-                if (actualAttr.exprId != ref.exprId) {
+                // We should only count ambiguous column references if the attribute is available as
+                // the input attributes of the root node. For example:
+                //   Join(b#1 = 3)
+                //     TableScan(t, [a#0, b#1])
+                //     Project(a#2)
+                //       TableScan(t, [a#2, b#3])
+                // This query is a self-join. The column 'b' in the join condition is not ambiguous,
+                // as it can't come from the right side, which only has column 'a'.
+                if (actualAttr.exprId != ref.exprId && inputAttrs.contains(actualAttr)) {
                   ambiguousColRefs += ref
                 }
               }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala
index 50846d9..76f07b5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala
@@ -248,4 +248,13 @@ class DataFrameSelfJoinSuite extends QueryTest with SharedSparkSession {
     val ds_id2 = df.logicalPlan.getTagValue(Dataset.DATASET_ID_TAG)
     assert(ds_id1 === ds_id2)
   }
+
+  test("SPARK-34200: ambiguous column reference should consider attribute availability") {
+    withTable("t") {
+      sql("CREATE TABLE t USING json AS SELECT 1 a, 2 b")
+      val df1 = spark.table("t")
+      val df2 = df1.select("a")
+      checkAnswer(df1.join(df2, df1("b") === 2), Row(1, 2, 1))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org