You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/01/22 11:12:55 UTC
[spark] branch branch-3.1 updated: [SPARK-34200][SQL] Ambiguous
column reference should consider attribute availability
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 2a89a8e [SPARK-34200][SQL] Ambiguous column reference should consider attribute availability
2a89a8e is described below
commit 2a89a8e84135fcd69398606083d513329a3396f2
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Fri Jan 22 20:11:53 2021 +0900
[SPARK-34200][SQL] Ambiguous column reference should consider attribute availability
### What changes were proposed in this pull request?
This is a long-standing bug that exists since we have the ambiguous self-join check. A column reference is not ambiguous if it can only come from one join side (e.g. the other side has a project to only pick a few columns). An example is
```
Join(b#1 = 3)
TableScan(t, [a#0, b#1])
Project(a#2)
TableScan(t, [a#2, b#3])
```
It's a self-join, but `b#1` is not ambiguous because it can't come from the right side, which only has column `a`.
### Why are the changes needed?
to not fail valid self-join queries.
### Does this PR introduce _any_ user-facing change?
yea as a bug fix
### How was this patch tested?
a new test
Closes #31287 from cloud-fan/self-join.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
(cherry picked from commit b8a69066271e82f146bbf6cd5638c544e49bb27f)
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../sql/execution/analysis/DetectAmbiguousSelfJoin.scala | 13 +++++++++++--
.../scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala | 9 +++++++++
2 files changed, 20 insertions(+), 2 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala
index ef657ba..b26a078 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.analysis
import scala.collection.mutable
import org.apache.spark.sql.{AnalysisException, Column, Dataset}
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, Equality, Expression, ExprId}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Cast, Equality, Expression, ExprId}
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
@@ -86,6 +86,7 @@ object DetectAmbiguousSelfJoin extends Rule[LogicalPlan] {
val colRefs = colRefAttrs.map(toColumnReference).distinct
val ambiguousColRefs = mutable.HashSet.empty[ColumnReference]
val dsIdSet = colRefs.map(_.datasetId).toSet
+ val inputAttrs = AttributeSet(plan.children.flatMap(_.output))
plan.foreach {
case LogicalPlanWithDatasetId(p, id) if dsIdSet.contains(id) =>
@@ -101,7 +102,15 @@ object DetectAmbiguousSelfJoin extends Rule[LogicalPlan] {
// the attribute of column reference, then the column reference is ambiguous, as it
// refers to a column that gets regenerated by self-join.
val actualAttr = p.output(ref.colPos).asInstanceOf[AttributeReference]
- if (actualAttr.exprId != ref.exprId) {
+ // We should only count ambiguous column references if the attribute is available as
+ // the input attributes of the root node. For example:
+ // Join(b#1 = 3)
+ // TableScan(t, [a#0, b#1])
+ // Project(a#2)
+ // TableScan(t, [a#2, b#3])
+ // This query is a self-join. The column 'b' in the join condition is not ambiguous,
+ // as it can't come from the right side, which only has column 'a'.
+ if (actualAttr.exprId != ref.exprId && inputAttrs.contains(actualAttr)) {
ambiguousColRefs += ref
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala
index 50846d9..76f07b5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala
@@ -248,4 +248,13 @@ class DataFrameSelfJoinSuite extends QueryTest with SharedSparkSession {
val ds_id2 = df.logicalPlan.getTagValue(Dataset.DATASET_ID_TAG)
assert(ds_id1 === ds_id2)
}
+
+ test("SPARK-34200: ambiguous column reference should consider attribute availability") {
+ withTable("t") {
+ sql("CREATE TABLE t USING json AS SELECT 1 a, 2 b")
+ val df1 = spark.table("t")
+ val df2 = df1.select("a")
+ checkAnswer(df1.join(df2, df1("b") === 2), Row(1, 2, 1))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org