You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/02/04 08:43:50 UTC
spark git commit: [SPARK-12828][SQL] Natural join follow-up
Repository: spark
Updated Branches:
refs/heads/master d39087147 -> dee801adb
[SPARK-12828][SQL] Natural join follow-up
This is a small addendum to #10762 to make the code more robust again future changes.
Author: Reynold Xin <rx...@databricks.com>
Closes #11070 from rxin/SPARK-12828-natural-join.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dee801ad
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dee801ad
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dee801ad
Branch: refs/heads/master
Commit: dee801adb78d6abd0abbf76b4dfa71aa296b4f0b
Parents: d390871
Author: Reynold Xin <rx...@databricks.com>
Authored: Wed Feb 3 23:43:48 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Feb 3 23:43:48 2016 -0800
----------------------------------------------------------------------
.../spark/sql/catalyst/analysis/Analyzer.scala | 21 +++++++++++---------
.../spark/sql/catalyst/plans/joinTypes.scala | 2 ++
.../analysis/ResolveNaturalJoinSuite.scala | 6 +++---
3 files changed, 17 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/dee801ad/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index b30ed59..b59eb12 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1239,21 +1239,23 @@ class Analyzer(
*/
object ResolveNaturalJoin extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
- // Should not skip unresolved nodes because natural join is always unresolved.
case j @ Join(left, right, NaturalJoin(joinType), condition) if j.resolvedExceptNatural =>
- // find common column names from both sides, should be treated like usingColumns
+ // find common column names from both sides
val joinNames = left.output.map(_.name).intersect(right.output.map(_.name))
val leftKeys = joinNames.map(keyName => left.output.find(_.name == keyName).get)
val rightKeys = joinNames.map(keyName => right.output.find(_.name == keyName).get)
val joinPairs = leftKeys.zip(rightKeys)
+
// Add joinPairs to joinConditions
val newCondition = (condition ++ joinPairs.map {
case (l, r) => EqualTo(l, r)
- }).reduceLeftOption(And)
+ }).reduceOption(And)
+
// columns not in joinPairs
val lUniqueOutput = left.output.filterNot(att => leftKeys.contains(att))
val rUniqueOutput = right.output.filterNot(att => rightKeys.contains(att))
- // we should only keep unique columns(depends on joinType) for joinCols
+
+ // the output list looks like: join keys, columns from left, columns from right
val projectList = joinType match {
case LeftOuter =>
leftKeys ++ lUniqueOutput ++ rUniqueOutput.map(_.withNullability(true))
@@ -1261,13 +1263,14 @@ class Analyzer(
rightKeys ++ lUniqueOutput.map(_.withNullability(true)) ++ rUniqueOutput
case FullOuter =>
// in full outer join, joinCols should be non-null if there is.
- val joinedCols = joinPairs.map {
- case (l, r) => Alias(Coalesce(Seq(l, r)), l.name)()
- }
- joinedCols ++ lUniqueOutput.map(_.withNullability(true)) ++
+ val joinedCols = joinPairs.map { case (l, r) => Alias(Coalesce(Seq(l, r)), l.name)() }
+ joinedCols ++
+ lUniqueOutput.map(_.withNullability(true)) ++
rUniqueOutput.map(_.withNullability(true))
- case _ =>
+ case Inner =>
rightKeys ++ lUniqueOutput ++ rUniqueOutput
+ case _ =>
+ sys.error("Unsupported natural join type " + joinType)
}
// use Project to trim unnecessary fields
Project(projectList, Join(left, right, joinType, newCondition))
http://git-wip-us.apache.org/repos/asf/spark/blob/dee801ad/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
index b10f1e6..27a7532 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
@@ -62,5 +62,7 @@ case object LeftSemi extends JoinType {
}
case class NaturalJoin(tpe: JoinType) extends JoinType {
+ require(Seq(Inner, LeftOuter, RightOuter, FullOuter).contains(tpe),
+ "Unsupported natural join type " + tpe)
override def sql: String = "NATURAL " + tpe.sql
}
http://git-wip-us.apache.org/repos/asf/spark/blob/dee801ad/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala
index a6554fb..fcf4ac1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala
@@ -30,10 +30,10 @@ class ResolveNaturalJoinSuite extends AnalysisTest {
lazy val aNotNull = a.notNull
lazy val bNotNull = b.notNull
lazy val cNotNull = c.notNull
- lazy val r1 = LocalRelation(a, b)
- lazy val r2 = LocalRelation(a, c)
+ lazy val r1 = LocalRelation(b, a)
+ lazy val r2 = LocalRelation(c, a)
lazy val r3 = LocalRelation(aNotNull, bNotNull)
- lazy val r4 = LocalRelation(bNotNull, cNotNull)
+ lazy val r4 = LocalRelation(cNotNull, bNotNull)
test("natural inner join") {
val plan = r1.join(r2, NaturalJoin(Inner), None)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org