You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/04/07 04:25:15 UTC
spark git commit: [SPARK-12610][SQL] Left Anti Join
Repository: spark
Updated Branches:
refs/heads/master 4901086fe -> d76592276
[SPARK-12610][SQL] Left Anti Join
### What changes were proposed in this pull request?
This PR adds support for `LEFT ANTI JOIN` to Spark SQL. A `LEFT ANTI JOIN` is the exact opposite of a `LEFT SEMI JOIN` and can be used to identify rows in one dataset that are not in another dataset. Note that `nulls` on the left side of the join cannot match a row on the right hand side of the join; the result is that left anti join will always select a row with a `null` in one or more of its keys.
We currently add support for the following SQL join syntax:
SELECT *
FROM tbl1 A
LEFT ANTI JOIN tbl2 B
ON A.Id = B.Id
Or using a dataframe:
tbl1.as("a").join(tbl2.as("b"), $"a.id" === $"b.id", "left_anti)
This PR provides serves as the basis for implementing `NOT EXISTS` and `NOT IN (...)` correlated sub-queries. It would also serve as good basis for implementing an more efficient `EXCEPT` operator.
The PR has been (losely) based on PR's by both davies (https://github.com/apache/spark/pull/10706) and chenghao-intel (https://github.com/apache/spark/pull/10563); credit should be given where credit is due.
This PR adds supports for `LEFT ANTI JOIN` to `BroadcastHashJoin` (including codegeneration), `ShuffledHashJoin` and `BroadcastNestedLoopJoin`.
### How was this patch tested?
Added tests to `JoinSuite` and ported `ExistenceJoinSuite` from https://github.com/apache/spark/pull/10563.
cc davies chenghao-intel rxin
Author: Herman van Hovell <hv...@questtec.nl>
Closes #12214 from hvanhovell/SPARK-12610.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d7659227
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d7659227
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d7659227
Branch: refs/heads/master
Commit: d76592276f9f66fed8012d876595de8717f516a9
Parents: 4901086
Author: Herman van Hovell <hv...@questtec.nl>
Authored: Wed Apr 6 19:25:10 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Apr 6 19:25:10 2016 -0700
----------------------------------------------------------------------
.../apache/spark/sql/catalyst/parser/SqlBase.g4 | 2 +
.../spark/sql/catalyst/analysis/Analyzer.scala | 2 +-
.../sql/catalyst/optimizer/Optimizer.scala | 8 +-
.../spark/sql/catalyst/parser/AstBuilder.scala | 1 +
.../spark/sql/catalyst/plans/joinTypes.scala | 17 +-
.../catalyst/plans/logical/basicOperators.scala | 4 +-
.../sql/catalyst/parser/PlanParserSuite.scala | 5 +-
.../spark/sql/execution/SparkPlanner.scala | 2 +-
.../spark/sql/execution/SparkStrategies.scala | 11 +-
.../sql/execution/joins/BroadcastHashJoin.scala | 99 ++++++++----
.../joins/BroadcastNestedLoopJoin.scala | 57 ++++---
.../spark/sql/execution/joins/HashJoin.scala | 18 ++-
.../sql/execution/joins/ShuffledHashJoin.scala | 1 +
.../scala/org/apache/spark/sql/JoinSuite.scala | 36 ++---
.../execution/joins/ExistenceJoinSuite.scala | 159 +++++++++++++++++++
.../sql/execution/joins/SemiJoinSuite.scala | 129 ---------------
.../spark/sql/hive/HiveSessionState.scala | 2 +-
17 files changed, 338 insertions(+), 215 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d7659227/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 8a45b4f..85cb585 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -380,6 +380,7 @@ joinType
| LEFT SEMI
| RIGHT OUTER?
| FULL OUTER?
+ | LEFT? ANTI
;
joinCriteria
@@ -878,6 +879,7 @@ INDEX: 'INDEX';
INDEXES: 'INDEXES';
LOCKS: 'LOCKS';
OPTION: 'OPTION';
+ANTI: 'ANTI';
STRING
: '\'' ( ~('\''|'\\') | ('\\' .) )* '\''
http://git-wip-us.apache.org/repos/asf/spark/blob/d7659227/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 473c91e..bc8cf4e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1424,7 +1424,7 @@ class Analyzer(
val projectList = joinType match {
case LeftOuter =>
leftKeys ++ lUniqueOutput ++ rUniqueOutput.map(_.withNullability(true))
- case LeftSemi =>
+ case LeftExistence(_) =>
leftKeys ++ lUniqueOutput
case RightOuter =>
rightKeys ++ lUniqueOutput.map(_.withNullability(true)) ++ rUniqueOutput
http://git-wip-us.apache.org/repos/asf/spark/blob/d7659227/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index c085a37..f581810 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -361,8 +361,8 @@ object ColumnPruning extends Rule[LogicalPlan] {
case p @ Project(_, g: Generate) if g.join && p.references.subsetOf(g.generatedSet) =>
p.copy(child = g.copy(join = false))
- // Eliminate unneeded attributes from right side of a LeftSemiJoin.
- case j @ Join(left, right, LeftSemi, condition) =>
+ // Eliminate unneeded attributes from right side of a Left Existence Join.
+ case j @ Join(left, right, LeftExistence(_), condition) =>
j.copy(right = prunedChild(right, j.references))
// all the columns will be used to compare, so we can't prune them
@@ -1126,7 +1126,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
(leftFilterConditions ++ commonFilterCondition).
reduceLeftOption(And).map(Filter(_, newJoin)).getOrElse(newJoin)
- case _ @ (LeftOuter | LeftSemi) =>
+ case LeftOuter | LeftExistence(_) =>
// push down the left side only `where` condition
val newLeft = leftFilterConditions.
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
@@ -1147,7 +1147,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
joinType match {
- case _ @ (Inner | LeftSemi) =>
+ case Inner | LeftExistence(_) =>
// push down the single side only join filter for both sides sub queries
val newLeft = leftJoinConditions.
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
http://git-wip-us.apache.org/repos/asf/spark/blob/d7659227/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 5a3aebf..aa59f3f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -572,6 +572,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
case null => Inner
case jt if jt.FULL != null => FullOuter
case jt if jt.SEMI != null => LeftSemi
+ case jt if jt.ANTI != null => LeftAnti
case jt if jt.LEFT != null => LeftOuter
case jt if jt.RIGHT != null => RightOuter
case _ => Inner
http://git-wip-us.apache.org/repos/asf/spark/blob/d7659227/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
index 9ca4f13..13f57c5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
@@ -26,13 +26,15 @@ object JoinType {
case "leftouter" | "left" => LeftOuter
case "rightouter" | "right" => RightOuter
case "leftsemi" => LeftSemi
+ case "leftanti" => LeftAnti
case _ =>
val supported = Seq(
"inner",
"outer", "full", "fullouter",
"leftouter", "left",
"rightouter", "right",
- "leftsemi")
+ "leftsemi",
+ "leftanti")
throw new IllegalArgumentException(s"Unsupported join type '$typ'. " +
"Supported join types include: " + supported.mkString("'", "', '", "'") + ".")
@@ -63,6 +65,10 @@ case object LeftSemi extends JoinType {
override def sql: String = "LEFT SEMI"
}
+case object LeftAnti extends JoinType {
+ override def sql: String = "LEFT ANTI"
+}
+
case class NaturalJoin(tpe: JoinType) extends JoinType {
require(Seq(Inner, LeftOuter, RightOuter, FullOuter).contains(tpe),
"Unsupported natural join type " + tpe)
@@ -70,7 +76,14 @@ case class NaturalJoin(tpe: JoinType) extends JoinType {
}
case class UsingJoin(tpe: JoinType, usingColumns: Seq[UnresolvedAttribute]) extends JoinType {
- require(Seq(Inner, LeftOuter, LeftSemi, RightOuter, FullOuter).contains(tpe),
+ require(Seq(Inner, LeftOuter, LeftSemi, RightOuter, FullOuter, LeftAnti).contains(tpe),
"Unsupported using join type " + tpe)
override def sql: String = "USING " + tpe.sql
}
+
+object LeftExistence {
+ def unapply(joinType: JoinType): Option[JoinType] = joinType match {
+ case LeftSemi | LeftAnti => Some(joinType)
+ case _ => None
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/d7659227/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index a18efc9..d3353be 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -252,7 +252,7 @@ case class Join(
override def output: Seq[Attribute] = {
joinType match {
- case LeftSemi =>
+ case LeftExistence(_) =>
left.output
case LeftOuter =>
left.output ++ right.output.map(_.withNullability(true))
@@ -276,7 +276,7 @@ case class Join(
.union(splitConjunctivePredicates(condition.get).toSet)
case Inner =>
left.constraints.union(right.constraints)
- case LeftSemi =>
+ case LeftExistence(_) =>
left.constraints
case LeftOuter =>
left.constraints
http://git-wip-us.apache.org/repos/asf/spark/blob/d7659227/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index 262537d..411e237 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -334,7 +334,7 @@ class PlanParserSuite extends PlanTest {
table("t").join(table("u"), UsingJoin(jt, Seq('a.attr, 'b.attr)), None).select(star()))
}
val testAll = Seq(testUnconditionalJoin, testConditionalJoin, testNaturalJoin, testUsingJoin)
-
+ val testExistence = Seq(testUnconditionalJoin, testConditionalJoin, testUsingJoin)
def test(sql: String, jt: JoinType, tests: Seq[(String, JoinType) => Unit]): Unit = {
tests.foreach(_(sql, jt))
}
@@ -348,6 +348,9 @@ class PlanParserSuite extends PlanTest {
test("right outer join", RightOuter, testAll)
test("full join", FullOuter, testAll)
test("full outer join", FullOuter, testAll)
+ test("left semi join", LeftSemi, testExistence)
+ test("left anti join", LeftAnti, testExistence)
+ test("anti join", LeftAnti, testExistence)
// Test multiple consecutive joins
assertEqual(
http://git-wip-us.apache.org/repos/asf/spark/blob/d7659227/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
index ac8072f..8d05ae4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
@@ -38,7 +38,7 @@ class SparkPlanner(
DDLStrategy ::
SpecialLimits ::
Aggregation ::
- LeftSemiJoin ::
+ ExistenceJoin ::
EquiJoinSelection ::
InMemoryScans ::
BasicOperators ::
http://git-wip-us.apache.org/repos/asf/spark/blob/d7659227/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index d77aba7..eee2b94 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -62,16 +62,17 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}
- object LeftSemiJoin extends Strategy with PredicateHelper {
+ object ExistenceJoin extends Strategy with PredicateHelper {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ExtractEquiJoinKeys(
- LeftSemi, leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
+ LeftExistence(jt), leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
Seq(joins.BroadcastHashJoin(
- leftKeys, rightKeys, LeftSemi, BuildRight, condition, planLater(left), planLater(right)))
+ leftKeys, rightKeys, jt, BuildRight, condition, planLater(left), planLater(right)))
// Find left semi joins where at least some predicates can be evaluated by matching join keys
- case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right) =>
+ case ExtractEquiJoinKeys(
+ LeftExistence(jt), leftKeys, rightKeys, condition, left, right) =>
Seq(joins.ShuffledHashJoin(
- leftKeys, rightKeys, LeftSemi, BuildRight, condition, planLater(left), planLater(right)))
+ leftKeys, rightKeys, jt, BuildRight, condition, planLater(left), planLater(right)))
case _ => Nil
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d7659227/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index 67ac9e9..e3d554c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastDistribution, Distribution, Partitioning, UnspecifiedDistribution}
import org.apache.spark.sql.execution.{BinaryNode, CodegenSupport, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.util.collection.CompactBuffer
/**
* Performs an inner hash join of two child relations. When the output RDD of this operator is
@@ -87,6 +86,7 @@ case class BroadcastHashJoin(
case Inner => codegenInner(ctx, input)
case LeftOuter | RightOuter => codegenOuter(ctx, input)
case LeftSemi => codegenSemi(ctx, input)
+ case LeftAnti => codegenAnti(ctx, input)
case x =>
throw new IllegalArgumentException(
s"BroadcastHashJoin should not take $x as the JoinType")
@@ -160,15 +160,14 @@ case class BroadcastHashJoin(
}
/**
- * Generates the code for Inner join.
+ * Generate the (non-equi) condition used to filter joined rows. This is used in Inner, Left Semi
+ * and Left Anti joins.
*/
- private def codegenInner(ctx: CodegenContext, input: Seq[ExprCode]): String = {
- val (broadcastRelation, relationTerm) = prepareBroadcast(ctx)
- val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
+ private def getJoinCondition(
+ ctx: CodegenContext,
+ input: Seq[ExprCode]): (String, String, Seq[ExprCode]) = {
val matched = ctx.freshName("matched")
val buildVars = genBuildSideVars(ctx, matched)
- val numOutput = metricTerm(ctx, "numOutputRows")
-
val checkCondition = if (condition.isDefined) {
val expr = condition.get
// evaluate the variables from build side that used by condition
@@ -184,6 +183,17 @@ case class BroadcastHashJoin(
} else {
""
}
+ (matched, checkCondition, buildVars)
+ }
+
+ /**
+ * Generates the code for Inner join.
+ */
+ private def codegenInner(ctx: CodegenContext, input: Seq[ExprCode]): String = {
+ val (broadcastRelation, relationTerm) = prepareBroadcast(ctx)
+ val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
+ val (matched, checkCondition, buildVars) = getJoinCondition(ctx, input)
+ val numOutput = metricTerm(ctx, "numOutputRows")
val resultVars = buildSide match {
case BuildLeft => buildVars ++ input
@@ -221,7 +231,6 @@ case class BroadcastHashJoin(
}
}
-
/**
* Generates the code for left or right outer join.
*/
@@ -276,7 +285,6 @@ case class BroadcastHashJoin(
ctx.copyResult = true
val matches = ctx.freshName("matches")
val iteratorCls = classOf[Iterator[UnsafeRow]].getName
- val i = ctx.freshName("i")
val found = ctx.freshName("found")
s"""
|// generate join key for stream side
@@ -304,26 +312,8 @@ case class BroadcastHashJoin(
private def codegenSemi(ctx: CodegenContext, input: Seq[ExprCode]): String = {
val (broadcastRelation, relationTerm) = prepareBroadcast(ctx)
val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
- val matched = ctx.freshName("matched")
- val buildVars = genBuildSideVars(ctx, matched)
+ val (matched, checkCondition, _) = getJoinCondition(ctx, input)
val numOutput = metricTerm(ctx, "numOutputRows")
-
- val checkCondition = if (condition.isDefined) {
- val expr = condition.get
- // evaluate the variables from build side that used by condition
- val eval = evaluateRequiredVariables(buildPlan.output, buildVars, expr.references)
- // filter the output via condition
- ctx.currentVars = input ++ buildVars
- val ev = BindReferences.bindReference(expr, streamedPlan.output ++ buildPlan.output).gen(ctx)
- s"""
- |$eval
- |${ev.code}
- |if (${ev.isNull} || !${ev.value}) continue;
- """.stripMargin
- } else {
- ""
- }
-
if (broadcastRelation.value.keyIsUnique) {
s"""
|// generate join key for stream side
@@ -357,4 +347,57 @@ case class BroadcastHashJoin(
""".stripMargin
}
}
+
+ /**
+ * Generates the code for anti join.
+ */
+ private def codegenAnti(ctx: CodegenContext, input: Seq[ExprCode]): String = {
+ val (broadcastRelation, relationTerm) = prepareBroadcast(ctx)
+ val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
+ val (matched, checkCondition, _) = getJoinCondition(ctx, input)
+ val numOutput = metricTerm(ctx, "numOutputRows")
+
+ if (broadcastRelation.value.keyIsUnique) {
+ s"""
+ |// generate join key for stream side
+ |${keyEv.code}
+ |// Check if the key has nulls.
+ |if (!($anyNull)) {
+ | // Check if the HashedRelation exists.
+ | UnsafeRow $matched = (UnsafeRow)$relationTerm.getValue(${keyEv.value});
+ | if ($matched != null) {
+ | // Evaluate the condition.
+ | $checkCondition
+ | }
+ |}
+ |$numOutput.add(1);
+ |${consume(ctx, input)}
+ """.stripMargin
+ } else {
+ val matches = ctx.freshName("matches")
+ val iteratorCls = classOf[Iterator[UnsafeRow]].getName
+ val found = ctx.freshName("found")
+ s"""
+ |// generate join key for stream side
+ |${keyEv.code}
+ |// Check if the key has nulls.
+ |if (!($anyNull)) {
+ | // Check if the HashedRelation exists.
+ | $iteratorCls $matches = ($iteratorCls)$relationTerm.get(${keyEv.value});
+ | if ($matches != null) {
+ | // Evaluate the condition.
+ | boolean $found = false;
+ | while (!$found && $matches.hasNext()) {
+ | UnsafeRow $matched = (UnsafeRow) $matches.next();
+ | $checkCondition
+ | $found = true;
+ | }
+ | if ($found) continue;
+ | }
+ |}
+ |$numOutput.add(1);
+ |${consume(ctx, input)}
+ """.stripMargin
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d7659227/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
index 4143e94..4ba710c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
@@ -73,7 +73,7 @@ case class BroadcastNestedLoopJoin(
left.output.map(_.withNullability(true)) ++ right.output
case FullOuter =>
left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
- case LeftSemi =>
+ case LeftExistence(_) =>
left.output
case x =>
throw new IllegalArgumentException(
@@ -175,8 +175,11 @@ case class BroadcastNestedLoopJoin(
* The implementation for these joins:
*
* LeftSemi with BuildRight
+ * Anti with BuildRight
*/
- private def leftSemiJoin(relation: Broadcast[Array[InternalRow]]): RDD[InternalRow] = {
+ private def leftExistenceJoin(
+ relation: Broadcast[Array[InternalRow]],
+ exists: Boolean): RDD[InternalRow] = {
assert(buildSide == BuildRight)
streamed.execute().mapPartitionsInternal { streamedIter =>
val buildRows = relation.value
@@ -184,10 +187,12 @@ case class BroadcastNestedLoopJoin(
if (condition.isDefined) {
streamedIter.filter(l =>
- buildRows.exists(r => boundCondition(joinedRow(l, r)))
+ buildRows.exists(r => boundCondition(joinedRow(l, r))) == exists
)
+ } else if (buildRows.nonEmpty == exists) {
+ streamedIter
} else {
- streamedIter.filter(r => !buildRows.isEmpty)
+ Iterator.empty
}
}
}
@@ -199,6 +204,7 @@ case class BroadcastNestedLoopJoin(
* RightOuter with BuildRight
* FullOuter
* LeftSemi with BuildLeft
+ * Anti with BuildLeft
*/
private def defaultJoin(relation: Broadcast[Array[InternalRow]]): RDD[InternalRow] = {
/** All rows that either match both-way, or rows from streamed joined with nulls. */
@@ -236,7 +242,27 @@ case class BroadcastNestedLoopJoin(
}
i += 1
}
- return sparkContext.makeRDD(buf.toSeq)
+ return sparkContext.makeRDD(buf)
+ }
+
+ val notMatchedBroadcastRows: Seq[InternalRow] = {
+ val nulls = new GenericMutableRow(streamed.output.size)
+ val buf: CompactBuffer[InternalRow] = new CompactBuffer()
+ var i = 0
+ val buildRows = relation.value
+ val joinedRow = new JoinedRow
+ joinedRow.withLeft(nulls)
+ while (i < buildRows.length) {
+ if (!matchedBroadcastRows.get(i)) {
+ buf += joinedRow.withRight(buildRows(i)).copy()
+ }
+ i += 1
+ }
+ buf
+ }
+
+ if (joinType == LeftAnti) {
+ return sparkContext.makeRDD(notMatchedBroadcastRows)
}
val matchedStreamRows = streamRdd.mapPartitionsInternal { streamedIter =>
@@ -264,22 +290,6 @@ case class BroadcastNestedLoopJoin(
}
}
- val notMatchedBroadcastRows: Seq[InternalRow] = {
- val nulls = new GenericMutableRow(streamed.output.size)
- val buf: CompactBuffer[InternalRow] = new CompactBuffer()
- var i = 0
- val buildRows = relation.value
- val joinedRow = new JoinedRow
- joinedRow.withLeft(nulls)
- while (i < buildRows.length) {
- if (!matchedBroadcastRows.get(i)) {
- buf += joinedRow.withRight(buildRows(i)).copy()
- }
- i += 1
- }
- buf.toSeq
- }
-
sparkContext.union(
matchedStreamRows,
sparkContext.makeRDD(notMatchedBroadcastRows)
@@ -295,13 +305,16 @@ case class BroadcastNestedLoopJoin(
case (LeftOuter, BuildRight) | (RightOuter, BuildLeft) =>
outerJoin(broadcastedRelation)
case (LeftSemi, BuildRight) =>
- leftSemiJoin(broadcastedRelation)
+ leftExistenceJoin(broadcastedRelation, exists = true)
+ case (LeftAnti, BuildRight) =>
+ leftExistenceJoin(broadcastedRelation, exists = false)
case _ =>
/**
* LeftOuter with BuildLeft
* RightOuter with BuildRight
* FullOuter
* LeftSemi with BuildLeft
+ * Anti with BuildLeft
*/
defaultJoin(broadcastedRelation)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d7659227/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index b7c0f3e..8f45d57 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -47,7 +47,7 @@ trait HashJoin {
left.output ++ right.output.map(_.withNullability(true))
case RightOuter =>
left.output.map(_.withNullability(true)) ++ right.output
- case LeftSemi =>
+ case LeftExistence(_) =>
left.output
case x =>
throw new IllegalArgumentException(s"HashJoin should not take $x as the JoinType")
@@ -197,6 +197,20 @@ trait HashJoin {
}
}
+ private def antiJoin(
+ streamIter: Iterator[InternalRow],
+ hashedRelation: HashedRelation): Iterator[InternalRow] = {
+ val joinKeys = streamSideKeyGenerator()
+ val joinedRow = new JoinedRow
+ streamIter.filter { current =>
+ val key = joinKeys(current)
+ lazy val buildIter = hashedRelation.get(key)
+ key.anyNull || buildIter == null || (condition.isDefined && !buildIter.exists {
+ row => boundCondition(joinedRow(current, row))
+ })
+ }
+ }
+
protected def join(
streamedIter: Iterator[InternalRow],
hashed: HashedRelation,
@@ -209,6 +223,8 @@ trait HashJoin {
outerJoin(streamedIter, hashed)
case LeftSemi =>
semiJoin(streamedIter, hashed)
+ case LeftAnti =>
+ antiJoin(streamedIter, hashed)
case x =>
throw new IllegalArgumentException(
s"BroadcastHashJoin should not take $x as the JoinType")
http://git-wip-us.apache.org/repos/asf/spark/blob/d7659227/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
index c63faac..bf86096 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
@@ -45,6 +45,7 @@ case class ShuffledHashJoin(
override def outputPartitioning: Partitioning = joinType match {
case Inner => PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning))
+ case LeftAnti => left.outputPartitioning
case LeftSemi => left.outputPartitioning
case LeftOuter => left.outputPartitioning
case RightOuter => right.outputPartitioning
http://git-wip-us.apache.org/repos/asf/spark/blob/d7659227/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index a5a4ff1..a87a41c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -41,7 +41,8 @@ class JoinSuite extends QueryTest with SharedSQLContext {
assert(planned.size === 1)
}
- def assertJoin(sqlString: String, c: Class[_]): Any = {
+ def assertJoin(pair: (String, Class[_])): Any = {
+ val (sqlString, c) = pair
val df = sql(sqlString)
val physical = df.queryExecution.sparkPlan
val operators = physical.collect {
@@ -53,8 +54,8 @@ class JoinSuite extends QueryTest with SharedSQLContext {
}
assert(operators.size === 1)
- if (operators(0).getClass() != c) {
- fail(s"$sqlString expected operator: $c, but got ${operators(0)}\n physical: \n$physical")
+ if (operators.head.getClass != c) {
+ fail(s"$sqlString expected operator: $c, but got ${operators.head}\n physical: \n$physical")
}
}
@@ -93,8 +94,10 @@ class JoinSuite extends QueryTest with SharedSQLContext {
("SELECT * FROM testData right JOIN testData2 ON (key * a != key + a)",
classOf[BroadcastNestedLoopJoin]),
("SELECT * FROM testData full JOIN testData2 ON (key * a != key + a)",
- classOf[BroadcastNestedLoopJoin])
- ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
+ classOf[BroadcastNestedLoopJoin]),
+ ("SELECT * FROM testData ANTI JOIN testData2 ON key = a", classOf[ShuffledHashJoin]),
+ ("SELECT * FROM testData LEFT ANTI JOIN testData2", classOf[BroadcastNestedLoopJoin])
+ ).foreach(assertJoin)
}
}
@@ -114,7 +117,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
classOf[BroadcastHashJoin]),
("SELECT * FROM testData join testData2 ON key = a where key = 2",
classOf[BroadcastHashJoin])
- ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
+ ).foreach(assertJoin)
sql("UNCACHE TABLE testData")
}
@@ -129,7 +132,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
classOf[BroadcastHashJoin]),
("SELECT * FROM testData right join testData2 ON key = a and key = 2",
classOf[BroadcastHashJoin])
- ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
+ ).foreach(assertJoin)
sql("UNCACHE TABLE testData")
}
@@ -419,25 +422,22 @@ class JoinSuite extends QueryTest with SharedSQLContext {
Row(null, 10))
}
- test("broadcasted left semi join operator selection") {
+ test("broadcasted existence join operator selection") {
sqlContext.cacheManager.clearCache()
sql("CACHE TABLE testData")
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1000000000") {
Seq(
- ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a",
- classOf[BroadcastHashJoin])
- ).foreach {
- case (query, joinClass) => assertJoin(query, joinClass)
- }
+ ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", classOf[BroadcastHashJoin]),
+ ("SELECT * FROM testData ANT JOIN testData2 ON key = a", classOf[BroadcastHashJoin])
+ ).foreach(assertJoin)
}
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
Seq(
- ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", classOf[ShuffledHashJoin])
- ).foreach {
- case (query, joinClass) => assertJoin(query, joinClass)
- }
+ ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", classOf[ShuffledHashJoin]),
+ ("SELECT * FROM testData LEFT ANTI JOIN testData2 ON key = a", classOf[ShuffledHashJoin])
+ ).foreach(assertJoin)
}
sql("UNCACHE TABLE testData")
@@ -489,7 +489,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
classOf[BroadcastNestedLoopJoin]),
("SELECT * FROM testData full JOIN testData2 WHERE (key * a != key + a)",
classOf[BroadcastNestedLoopJoin])
- ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
+ ).foreach(assertJoin)
checkAnswer(
sql(
http://git-wip-us.apache.org/repos/asf/spark/blob/d7659227/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala
new file mode 100644
index 0000000..8cdfa8a
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.catalyst.expressions.{And, Expression, LessThan}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.{Inner, JoinType, LeftAnti, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.logical.Join
+import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest}
+import org.apache.spark.sql.execution.exchange.EnsureRequirements
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType}
+
+class ExistenceJoinSuite extends SparkPlanTest with SharedSQLContext {
+
+ private lazy val left = sqlContext.createDataFrame(
+ sparkContext.parallelize(Seq(
+ Row(1, 2.0),
+ Row(1, 2.0),
+ Row(2, 1.0),
+ Row(2, 1.0),
+ Row(3, 3.0),
+ Row(null, null),
+ Row(null, 5.0),
+ Row(6, null)
+ )), new StructType().add("a", IntegerType).add("b", DoubleType))
+
+ private lazy val right = sqlContext.createDataFrame(
+ sparkContext.parallelize(Seq(
+ Row(2, 3.0),
+ Row(2, 3.0),
+ Row(3, 2.0),
+ Row(4, 1.0),
+ Row(null, null),
+ Row(null, 5.0),
+ Row(6, null)
+ )), new StructType().add("c", IntegerType).add("d", DoubleType))
+
+ private lazy val condition = {
+ And((left.col("a") === right.col("c")).expr,
+ LessThan(left.col("b").expr, right.col("d").expr))
+ }
+
+ private lazy val conditionNEQ = {
+ And((left.col("a") < right.col("c")).expr,
+ LessThan(left.col("b").expr, right.col("d").expr))
+ }
+
+ // Note: the input dataframes and expression must be evaluated lazily because
+ // the SQLContext should be used only within a test to keep SQL tests stable
+ private def testExistenceJoin(
+ testName: String,
+ joinType: JoinType,
+ leftRows: => DataFrame,
+ rightRows: => DataFrame,
+ condition: => Expression,
+ expectedAnswer: Seq[Row]): Unit = {
+
+ def extractJoinParts(): Option[ExtractEquiJoinKeys.ReturnType] = {
+ val join = Join(leftRows.logicalPlan, rightRows.logicalPlan, Inner, Some(condition))
+ ExtractEquiJoinKeys.unapply(join)
+ }
+
+ test(s"$testName using ShuffledHashJoin") {
+ extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _) =>
+ withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
+ checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) =>
+ EnsureRequirements(left.sqlContext.sessionState.conf).apply(
+ ShuffledHashJoin(
+ leftKeys, rightKeys, joinType, BuildRight, boundCondition, left, right)),
+ expectedAnswer,
+ sortAnswers = true)
+ }
+ }
+ }
+
+ test(s"$testName using BroadcastHashJoin") {
+ extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _) =>
+ withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
+ checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) =>
+ EnsureRequirements(left.sqlContext.sessionState.conf).apply(
+ BroadcastHashJoin(
+ leftKeys, rightKeys, joinType, BuildRight, boundCondition, left, right)),
+ expectedAnswer,
+ sortAnswers = true)
+ }
+ }
+ }
+
+ test(s"$testName using BroadcastNestedLoopJoin build left") {
+ withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
+ checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) =>
+ EnsureRequirements(left.sqlContext.sessionState.conf).apply(
+ BroadcastNestedLoopJoin(left, right, BuildLeft, joinType, Some(condition))),
+ expectedAnswer,
+ sortAnswers = true)
+ }
+ }
+
+ test(s"$testName using BroadcastNestedLoopJoin build right") {
+ withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
+ checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) =>
+ EnsureRequirements(left.sqlContext.sessionState.conf).apply(
+ BroadcastNestedLoopJoin(left, right, BuildRight, joinType, Some(condition))),
+ expectedAnswer,
+ sortAnswers = true)
+ }
+ }
+ }
+
+ testExistenceJoin(
+ "basic test for left semi join",
+ LeftSemi,
+ left,
+ right,
+ condition,
+ Seq(Row(2, 1.0), Row(2, 1.0)))
+
+ testExistenceJoin(
+ "basic test for left semi non equal join",
+ LeftSemi,
+ left,
+ right,
+ conditionNEQ,
+ Seq(Row(1, 2.0), Row(1, 2.0), Row(2, 1.0), Row(2, 1.0)))
+
+ testExistenceJoin(
+ "basic test for anti join",
+ LeftAnti,
+ left,
+ right,
+ condition,
+ Seq(Row(1, 2.0), Row(1, 2.0), Row(3, 3.0), Row(6, null), Row(null, 5.0), Row(null, null)))
+
+ testExistenceJoin(
+ "basic test for anti non equal join",
+ LeftAnti,
+ left,
+ right,
+ conditionNEQ,
+ Seq(Row(3, 3.0), Row(6, null), Row(null, 5.0), Row(null, null)))
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/d7659227/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala
deleted file mode 100644
index 985a96f..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.joins
-
-import org.apache.spark.sql.{DataFrame, Row}
-import org.apache.spark.sql.catalyst.expressions.{And, Expression, LessThan}
-import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
-import org.apache.spark.sql.catalyst.plans.{Inner, LeftSemi}
-import org.apache.spark.sql.catalyst.plans.logical.Join
-import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest}
-import org.apache.spark.sql.execution.exchange.EnsureRequirements
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType}
-
-class SemiJoinSuite extends SparkPlanTest with SharedSQLContext {
-
- private lazy val left = sqlContext.createDataFrame(
- sparkContext.parallelize(Seq(
- Row(1, 2.0),
- Row(1, 2.0),
- Row(2, 1.0),
- Row(2, 1.0),
- Row(3, 3.0),
- Row(null, null),
- Row(null, 5.0),
- Row(6, null)
- )), new StructType().add("a", IntegerType).add("b", DoubleType))
-
- private lazy val right = sqlContext.createDataFrame(
- sparkContext.parallelize(Seq(
- Row(2, 3.0),
- Row(2, 3.0),
- Row(3, 2.0),
- Row(4, 1.0),
- Row(null, null),
- Row(null, 5.0),
- Row(6, null)
- )), new StructType().add("c", IntegerType).add("d", DoubleType))
-
- private lazy val condition = {
- And((left.col("a") === right.col("c")).expr,
- LessThan(left.col("b").expr, right.col("d").expr))
- }
-
- // Note: the input dataframes and expression must be evaluated lazily because
- // the SQLContext should be used only within a test to keep SQL tests stable
- private def testLeftSemiJoin(
- testName: String,
- leftRows: => DataFrame,
- rightRows: => DataFrame,
- condition: => Expression,
- expectedAnswer: Seq[Product]): Unit = {
-
- def extractJoinParts(): Option[ExtractEquiJoinKeys.ReturnType] = {
- val join = Join(leftRows.logicalPlan, rightRows.logicalPlan, Inner, Some(condition))
- ExtractEquiJoinKeys.unapply(join)
- }
-
- test(s"$testName using ShuffledHashJoin") {
- extractJoinParts().foreach { case (joinType, leftKeys, rightKeys, boundCondition, _, _) =>
- withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
- checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) =>
- EnsureRequirements(left.sqlContext.sessionState.conf).apply(
- ShuffledHashJoin(
- leftKeys, rightKeys, LeftSemi, BuildRight, boundCondition, left, right)),
- expectedAnswer.map(Row.fromTuple),
- sortAnswers = true)
- }
- }
- }
-
- test(s"$testName using BroadcastHashJoin") {
- extractJoinParts().foreach { case (joinType, leftKeys, rightKeys, boundCondition, _, _) =>
- withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
- checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) =>
- BroadcastHashJoin(
- leftKeys, rightKeys, LeftSemi, BuildRight, boundCondition, left, right),
- expectedAnswer.map(Row.fromTuple),
- sortAnswers = true)
- }
- }
- }
-
- test(s"$testName using BroadcastNestedLoopJoin build left") {
- withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
- checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) =>
- BroadcastNestedLoopJoin(left, right, BuildLeft, LeftSemi, Some(condition)),
- expectedAnswer.map(Row.fromTuple),
- sortAnswers = true)
- }
- }
-
- test(s"$testName using BroadcastNestedLoopJoin build right") {
- withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
- checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) =>
- BroadcastNestedLoopJoin(left, right, BuildRight, LeftSemi, Some(condition)),
- expectedAnswer.map(Row.fromTuple),
- sortAnswers = true)
- }
- }
- }
-
- testLeftSemiJoin(
- "basic test",
- left,
- right,
- condition,
- Seq(
- (2, 1.0),
- (2, 1.0)
- )
- )
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/d7659227/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index cff24e2..b992fda 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -92,7 +92,7 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx)
DataSinks,
Scripts,
Aggregation,
- LeftSemiJoin,
+ ExistenceJoin,
EquiJoinSelection,
BasicOperators,
BroadcastNestedLoop,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org