You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/06/24 21:29:12 UTC
spark git commit: [SPARK-7088] [SQL] Fix analysis for 3rd party
logical plan.
Repository: spark
Updated Branches:
refs/heads/master 43e66192f -> b84d4b4df
[SPARK-7088] [SQL] Fix analysis for 3rd party logical plan.
ResolveReferences analysis rule now does not throw when it cannot resolve references in a self-join.
Author: Santiago M. Mola <sm...@stratio.com>
Closes #6853 from smola/SPARK-7088 and squashes the following commits:
af71ac7 [Santiago M. Mola] [SPARK-7088] Fix analysis for 3rd party logical plan.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b84d4b4d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b84d4b4d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b84d4b4d
Branch: refs/heads/master
Commit: b84d4b4dfe8ced1b96a0c74ef968a20a1bba8231
Parents: 43e6619
Author: Santiago M. Mola <sm...@stratio.com>
Authored: Wed Jun 24 12:29:07 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Wed Jun 24 12:29:07 2015 -0700
----------------------------------------------------------------------
.../spark/sql/catalyst/analysis/Analyzer.scala | 38 ++++++++++----------
.../sql/catalyst/analysis/CheckAnalysis.scala | 12 +++++++
2 files changed, 32 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/b84d4b4d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 0a3f5a7..b06759f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -283,7 +283,7 @@ class Analyzer(
val conflictingAttributes = left.outputSet.intersect(right.outputSet)
logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} in $j")
- val (oldRelation, newRelation) = right.collect {
+ right.collect {
// Handle base relations that might appear more than once.
case oldVersion: MultiInstanceRelation
if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
@@ -308,25 +308,27 @@ class Analyzer(
if AttributeSet(windowExpressions.map(_.toAttribute)).intersect(conflictingAttributes)
.nonEmpty =>
(oldVersion, oldVersion.copy(windowExpressions = newAliases(windowExpressions)))
- }.headOption.getOrElse { // Only handle first case, others will be fixed on the next pass.
- sys.error(
- s"""
- |Failure when resolving conflicting references in Join:
- |$plan
- |
- |Conflicting attributes: ${conflictingAttributes.mkString(",")}
- """.stripMargin)
}
-
- val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
- val newRight = right transformUp {
- case r if r == oldRelation => newRelation
- } transformUp {
- case other => other transformExpressions {
- case a: Attribute => attributeRewrites.get(a).getOrElse(a)
- }
+ // Only handle first case, others will be fixed on the next pass.
+ .headOption match {
+ case None =>
+ /*
+ * No result implies that there is a logical plan node that produces new references
+ * that this rule cannot handle. When that is the case, there must be another rule
+ * that resolves these conflicts. Otherwise, the analysis will fail.
+ */
+ j
+ case Some((oldRelation, newRelation)) =>
+ val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
+ val newRight = right transformUp {
+ case r if r == oldRelation => newRelation
+ } transformUp {
+ case other => other transformExpressions {
+ case a: Attribute => attributeRewrites.get(a).getOrElse(a)
+ }
+ }
+ j.copy(right = newRight)
}
- j.copy(right = newRight)
// When resolve `SortOrder`s in Sort based on child, don't report errors as
// we still have chance to resolve it based on grandchild
http://git-wip-us.apache.org/repos/asf/spark/blob/b84d4b4d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index c5a1437..a069b47 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -48,6 +48,7 @@ trait CheckAnalysis {
// We transform up and order the rules so as to catch the first possible failure instead
// of the result of cascading resolution failures.
plan.foreachUp {
+
case operator: LogicalPlan =>
operator transformExpressionsUp {
case a: Attribute if !a.resolved =>
@@ -121,6 +122,17 @@ trait CheckAnalysis {
case _ => // Analysis successful!
}
+
+ // Special handling for cases when self-join introduce duplicate expression ids.
+ case j @ Join(left, right, _, _) if left.outputSet.intersect(right.outputSet).nonEmpty =>
+ val conflictingAttributes = left.outputSet.intersect(right.outputSet)
+ failAnalysis(
+ s"""
+ |Failure when resolving conflicting references in Join:
+ |$plan
+ |Conflicting attributes: ${conflictingAttributes.mkString(",")}
+ |""".stripMargin)
+
}
extendedCheckRules.foreach(_(plan))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org