You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by adrian-wang <gi...@git.apache.org> on 2014/05/20 11:35:14 UTC
[GitHub] spark pull request: add support for left semi join
GitHub user adrian-wang opened a pull request:
https://github.com/apache/spark/pull/837
add support for left semi join
Just submit another solution for #395
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/adrian-wang/spark left-semi-join-support
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/837.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #837
----
commit 14cff800ff2c33ffcb1eaa70bab8109b163cc9d0
Author: Daoyuan <da...@intel.com>
Date: 2014-05-20T09:33:32Z
add support for left semi join
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...
Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-45418856
(I deleted my earlier comment because I found a mistake)
I think this is looking pretty good, but we should at least add one test for the Broadcast Nested Loop version. Here's a PR against your branch that does that: https://github.com/adrian-wang/spark/pull/1
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: add support for left semi join
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-43727532
Merged build finished.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-44922845
Merged build started.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: add support for left semi join
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-43775032
Merged build started.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-45431143
All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15540/
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: add support for left semi join
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-43605156
Can one of the admins verify this patch?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-45429982
Merged build started.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: add support for left semi join
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-43788086
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15121/
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: add support for left semi join
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-43729450
Merged build started.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: add support for left semi join
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-43729433
Merged build triggered.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-45431142
Merged build finished. All automated tests passed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: add support for left semi join
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-43788084
Merged build finished.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: add support for left semi join
Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:
https://github.com/apache/spark/pull/837#discussion_r13262687
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
@@ -144,6 +144,150 @@ case class HashJoin(
* :: DeveloperApi ::
*/
@DeveloperApi
+case class LeftSemiJoinHash(
+ leftKeys: Seq[Expression],
+ rightKeys: Seq[Expression],
+ buildSide: BuildSide,
+ left: SparkPlan,
+ right: SparkPlan) extends BinaryNode {
+
+ override def outputPartitioning: Partitioning = left.outputPartitioning
+
+ override def requiredChildDistribution =
+ ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
+
+ val (buildPlan, streamedPlan) = buildSide match {
+ case BuildLeft => (left, right)
+ case BuildRight => (right, left)
+ }
+
+ val (buildKeys, streamedKeys) = buildSide match {
+ case BuildLeft => (leftKeys, rightKeys)
+ case BuildRight => (rightKeys, leftKeys)
+ }
+
+ def output = left.output
+
+ @transient lazy val buildSideKeyGenerator = new Projection(buildKeys, buildPlan.output)
+ @transient lazy val streamSideKeyGenerator =
+ () => new MutableProjection(streamedKeys, streamedPlan.output)
+
+ def execute() = {
+
+ buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
+ // TODO: Use Spark's HashMap implementation.
+ val hashTable = new java.util.HashMap[Row, ArrayBuffer[Row]]()
+ var currentRow: Row = null
+
+ // Create a mapping of buildKeys -> rows
+ while (buildIter.hasNext) {
+ currentRow = buildIter.next()
+ val rowKey = buildSideKeyGenerator(currentRow)
+ if(!rowKey.anyNull) {
+ val existingMatchList = hashTable.get(rowKey)
+ val matchList = if (existingMatchList == null) {
+ val newMatchList = new ArrayBuffer[Row]()
+ hashTable.put(rowKey, newMatchList)
+ newMatchList
+ } else {
+ existingMatchList
+ }
+ matchList += currentRow.copy()
+ }
+ }
+
+ new Iterator[Row] {
+ private[this] var currentStreamedRow: Row = _
+ private[this] var currentHashMatched: Boolean = false
+
+ private[this] val joinKeys = streamSideKeyGenerator()
+
+ override final def hasNext: Boolean =
+ streamIter.hasNext && fetchNext()
+
+ override final def next() = {
+ currentStreamedRow
--- End diff --
Is this correct if the operator is created with BuildLeft instead of BuildRight? I think that would turn it into a RightSemiJoin. Perhaps we should just remove the option to build on the other side. I think you can then also safely simplify this to use a HashSet instead of a HashMap, which will reduce memory consumption significantly.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...
Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:
https://github.com/apache/spark/pull/837#discussion_r13520365
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
@@ -119,6 +119,11 @@ object HashFilteredJoin extends Logging with PredicateHelper {
case FilteredOperation(predicates, join @ Join(left, right, Inner, condition)) =>
logger.debug(s"Considering hash inner join on: ${predicates ++ condition}")
splitPredicates(predicates ++ condition, join)
+ // All predicates can be evaluated for left semi join (those that are in the WHERE
+ // clause can only from left table, so they can all be pushed down.)
--- End diff --
I think in general we should avoid making too many assumptions in the planner about what optimizations have occurred. For example, in the future we might avoid pushing down predicates that are very expensive to evaluate as it might be cheaper to run them on an already filtered set of data. However, in the case of LEFT SEMI JOIN, I think it is actually okay to push all evaluation into the join condition, even if they only refer to the left table. Is that true?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: add support for left semi join
Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:
https://github.com/apache/spark/pull/837#discussion_r13262607
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
@@ -144,6 +144,150 @@ case class HashJoin(
* :: DeveloperApi ::
*/
@DeveloperApi
+case class LeftSemiJoinHash(
+ leftKeys: Seq[Expression],
--- End diff --
Intent only 4 spaces here.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...
Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-45526760
Thanks! I've merged this into 1.0 and master.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: add support for left semi join
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-43775013
Merged build triggered.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-44927038
All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15374/
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: add support for left semi join
Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-44756469
This is getting closer. Thanks for working on it!
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: add support for left semi join
Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-44756501
Can you add "[SPARK-1495][SQL]" to the PR title?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-44927036
Merged build finished. All automated tests passed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: add support for left semi join
Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:
https://github.com/apache/spark/pull/837#discussion_r13262730
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
@@ -144,6 +144,150 @@ case class HashJoin(
* :: DeveloperApi ::
*/
@DeveloperApi
+case class LeftSemiJoinHash(
+ leftKeys: Seq[Expression],
+ rightKeys: Seq[Expression],
+ buildSide: BuildSide,
+ left: SparkPlan,
+ right: SparkPlan) extends BinaryNode {
+
+ override def outputPartitioning: Partitioning = left.outputPartitioning
+
+ override def requiredChildDistribution =
+ ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
+
+ val (buildPlan, streamedPlan) = buildSide match {
+ case BuildLeft => (left, right)
+ case BuildRight => (right, left)
+ }
+
+ val (buildKeys, streamedKeys) = buildSide match {
+ case BuildLeft => (leftKeys, rightKeys)
+ case BuildRight => (rightKeys, leftKeys)
+ }
+
+ def output = left.output
+
+ @transient lazy val buildSideKeyGenerator = new Projection(buildKeys, buildPlan.output)
+ @transient lazy val streamSideKeyGenerator =
+ () => new MutableProjection(streamedKeys, streamedPlan.output)
+
+ def execute() = {
+
+ buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
+ // TODO: Use Spark's HashMap implementation.
+ val hashTable = new java.util.HashMap[Row, ArrayBuffer[Row]]()
+ var currentRow: Row = null
+
+ // Create a mapping of buildKeys -> rows
+ while (buildIter.hasNext) {
+ currentRow = buildIter.next()
+ val rowKey = buildSideKeyGenerator(currentRow)
+ if(!rowKey.anyNull) {
+ val existingMatchList = hashTable.get(rowKey)
+ val matchList = if (existingMatchList == null) {
+ val newMatchList = new ArrayBuffer[Row]()
+ hashTable.put(rowKey, newMatchList)
+ newMatchList
+ } else {
+ existingMatchList
+ }
+ matchList += currentRow.copy()
+ }
+ }
+
+ new Iterator[Row] {
+ private[this] var currentStreamedRow: Row = _
+ private[this] var currentHashMatched: Boolean = false
+
+ private[this] val joinKeys = streamSideKeyGenerator()
+
+ override final def hasNext: Boolean =
+ streamIter.hasNext && fetchNext()
+
+ override final def next() = {
+ currentStreamedRow
+ }
+
+ /**
+ * Searches the streamed iterator for the next row that has at least one match in hashtable.
+ *
+ * @return true if the search is successful, and false the streamed iterator runs out of
+ * tuples.
+ */
+ private final def fetchNext(): Boolean = {
+ currentHashMatched = false
+ while (!currentHashMatched && streamIter.hasNext) {
+ currentStreamedRow = streamIter.next()
+ if (!joinKeys(currentStreamedRow).anyNull) {
+ currentHashMatched = true
+ }
+ }
+ currentHashMatched
+ }
+ }
+ }
+ }
+}
+
+/**
+ * :: DeveloperApi ::
+ */
+@DeveloperApi
+case class LeftSemiJoinBNL(
--- End diff --
I don't think this operator is exercised by the included test cases. We should add a test where the join condition can be calculated with hash keys.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: add support for left semi join
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-43737611
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15116/
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...
Posted by adrian-wang <gi...@git.apache.org>.
Github user adrian-wang commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-44923039
Hi Michael, when I was adding the Scala doc, I realized that if the join is not calculated in LeftSemiJoinHash, then it simply means there's no join keys for the left semi join. Then if I pushed down those predicates and conditions(all of them can be pushed down), I only need to verify the if right table size is null here, to decide whether to output the left table. So LeftSemiJoinBNL would be very much excessive. Am I right?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: add support for left semi join
Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:
https://github.com/apache/spark/pull/837#discussion_r13262710
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
@@ -144,6 +144,150 @@ case class HashJoin(
* :: DeveloperApi ::
*/
@DeveloperApi
+case class LeftSemiJoinHash(
+ leftKeys: Seq[Expression],
+ rightKeys: Seq[Expression],
+ buildSide: BuildSide,
+ left: SparkPlan,
+ right: SparkPlan) extends BinaryNode {
+
+ override def outputPartitioning: Partitioning = left.outputPartitioning
+
+ override def requiredChildDistribution =
+ ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
+
+ val (buildPlan, streamedPlan) = buildSide match {
+ case BuildLeft => (left, right)
+ case BuildRight => (right, left)
+ }
+
+ val (buildKeys, streamedKeys) = buildSide match {
+ case BuildLeft => (leftKeys, rightKeys)
+ case BuildRight => (rightKeys, leftKeys)
+ }
+
+ def output = left.output
+
+ @transient lazy val buildSideKeyGenerator = new Projection(buildKeys, buildPlan.output)
+ @transient lazy val streamSideKeyGenerator =
+ () => new MutableProjection(streamedKeys, streamedPlan.output)
+
+ def execute() = {
+
+ buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
+ // TODO: Use Spark's HashMap implementation.
+ val hashTable = new java.util.HashMap[Row, ArrayBuffer[Row]]()
+ var currentRow: Row = null
+
+ // Create a mapping of buildKeys -> rows
+ while (buildIter.hasNext) {
+ currentRow = buildIter.next()
+ val rowKey = buildSideKeyGenerator(currentRow)
+ if(!rowKey.anyNull) {
+ val existingMatchList = hashTable.get(rowKey)
+ val matchList = if (existingMatchList == null) {
+ val newMatchList = new ArrayBuffer[Row]()
+ hashTable.put(rowKey, newMatchList)
+ newMatchList
+ } else {
+ existingMatchList
+ }
+ matchList += currentRow.copy()
+ }
+ }
+
+ new Iterator[Row] {
+ private[this] var currentStreamedRow: Row = _
+ private[this] var currentHashMatched: Boolean = false
+
+ private[this] val joinKeys = streamSideKeyGenerator()
+
+ override final def hasNext: Boolean =
+ streamIter.hasNext && fetchNext()
+
+ override final def next() = {
+ currentStreamedRow
+ }
+
+ /**
+ * Searches the streamed iterator for the next row that has at least one match in hashtable.
+ *
+ * @return true if the search is successful, and false the streamed iterator runs out of
+ * tuples.
+ */
+ private final def fetchNext(): Boolean = {
+ currentHashMatched = false
+ while (!currentHashMatched && streamIter.hasNext) {
+ currentStreamedRow = streamIter.next()
+ if (!joinKeys(currentStreamedRow).anyNull) {
+ currentHashMatched = true
+ }
+ }
+ currentHashMatched
+ }
+ }
+ }
+ }
+}
+
+/**
+ * :: DeveloperApi ::
+ */
+@DeveloperApi
+case class LeftSemiJoinBNL(
+ streamed: SparkPlan, broadcast: SparkPlan, condition: Option[Expression])
+ (@transient sc: SparkContext)
+ extends BinaryNode {
+ // TODO: Override requiredChildDistribution.
+
+ override def outputPartitioning: Partitioning = streamed.outputPartitioning
+
+ override def otherCopyArgs = sc :: Nil
+
+ def output = left.output
+
+ /** The Streamed Relation */
+ def left = streamed
+ /** The Broadcast relation */
+ def right = broadcast
+
+ @transient lazy val boundCondition =
+ InterpretedPredicate(
+ condition
+ .map(c => BindReferences.bindReference(c, left.output ++ right.output))
+ .getOrElse(Literal(true)))
+
+
+ def execute() = {
+ val broadcastedRelation = sc.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq)
+
+ val streamedPlusMatches = streamed.execute().mapPartitions { streamedIter =>
+ val joinedRow = new JoinedRow
+
+ streamedIter.filter(streamedRow => {
+ var i = 0
+ var matched = false
+
+ while (i < broadcastedRelation.value.size && !matched) {
+ // TODO: One bitset per partition instead of per row.
+ val broadcastedRow = broadcastedRelation.value(i)
+ if (boundCondition(joinedRow(streamedRow, broadcastedRow))) {
+ matched = true
+ }
+ i += 1
+ }
+ matched
+ }).map(streamedRow => (streamedRow, null))
+ }
+
+ streamedPlusMatches.map(_._1)
--- End diff --
I think you can remove the above tuple creation as well as this line that removes it.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...
Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-45418467
I think this is looking pretty good. One problem is that there are no tests for the nested loop version. I tried adding this to SQLQuerySuite:
```scala
test("left semi greater than predicate") {
checkAnswer(
sql("SELECT * FROM testData2 x JOIN testData2 y WHERE x.a >= y.a + 2"),
Seq((3,1), (3,2))
)
}
```
However this points out that we need to fix the other join strategies to avoid matching semi joins:
```scala
[info] - left semi greater than predicate *** FAILED *** (174 milliseconds)
[info] Results do not match for query:
...
[info] == Physical Plan ==
[info] Project [a#18:0,b#19:1,a#20:2,b#21:3]
[info] Filter (a#18:0 >= (a#20:2 + 2))
[info] CartesianProduct
[info] ExistingRdd [a#18,b#19], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:174
[info] ExistingRdd [a#20,b#21], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:174
[info]
[info] == Results ==
[info] !== Correct Answer - 2 == == Spark Answer - 4 ==
[info] !Vector(3, 1) [3,1,1,1]
[info] !Vector(3, 2) [3,1,1,2]
[info] ! [3,2,1,1]
[info] ! [3,2,1,2] (QueryTest.scala:54)
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: add support for left semi join
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-43737610
Merged build finished.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: add support for left semi join
Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:
https://github.com/apache/spark/pull/837#discussion_r13262640
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
@@ -144,6 +144,150 @@ case class HashJoin(
* :: DeveloperApi ::
--- End diff --
I realize that we aren't particularly good about this in most of the other physical operators, but could you add some Scala doc here about how this operator works and what the expected performance characteristics are? Same below. The goal of the Scala doc for physical operators should be to make it easy for people to understand query plans that are printed out by EXPLAIN.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-44922835
Merged build triggered.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/spark/pull/837
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: add support for left semi join
Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-43727297
Jenkins, add to whitelist.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: add support for left semi join
Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:
https://github.com/apache/spark/pull/837#discussion_r13262704
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
@@ -144,6 +144,150 @@ case class HashJoin(
* :: DeveloperApi ::
*/
@DeveloperApi
+case class LeftSemiJoinHash(
+ leftKeys: Seq[Expression],
+ rightKeys: Seq[Expression],
+ buildSide: BuildSide,
+ left: SparkPlan,
+ right: SparkPlan) extends BinaryNode {
+
+ override def outputPartitioning: Partitioning = left.outputPartitioning
+
+ override def requiredChildDistribution =
+ ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
+
+ val (buildPlan, streamedPlan) = buildSide match {
+ case BuildLeft => (left, right)
+ case BuildRight => (right, left)
+ }
+
+ val (buildKeys, streamedKeys) = buildSide match {
+ case BuildLeft => (leftKeys, rightKeys)
+ case BuildRight => (rightKeys, leftKeys)
+ }
+
+ def output = left.output
+
+ @transient lazy val buildSideKeyGenerator = new Projection(buildKeys, buildPlan.output)
+ @transient lazy val streamSideKeyGenerator =
+ () => new MutableProjection(streamedKeys, streamedPlan.output)
+
+ def execute() = {
+
+ buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
+ // TODO: Use Spark's HashMap implementation.
+ val hashTable = new java.util.HashMap[Row, ArrayBuffer[Row]]()
+ var currentRow: Row = null
+
+ // Create a mapping of buildKeys -> rows
+ while (buildIter.hasNext) {
+ currentRow = buildIter.next()
+ val rowKey = buildSideKeyGenerator(currentRow)
+ if(!rowKey.anyNull) {
+ val existingMatchList = hashTable.get(rowKey)
+ val matchList = if (existingMatchList == null) {
+ val newMatchList = new ArrayBuffer[Row]()
+ hashTable.put(rowKey, newMatchList)
+ newMatchList
+ } else {
+ existingMatchList
+ }
+ matchList += currentRow.copy()
+ }
+ }
+
+ new Iterator[Row] {
+ private[this] var currentStreamedRow: Row = _
+ private[this] var currentHashMatched: Boolean = false
+
+ private[this] val joinKeys = streamSideKeyGenerator()
+
+ override final def hasNext: Boolean =
+ streamIter.hasNext && fetchNext()
+
+ override final def next() = {
+ currentStreamedRow
+ }
+
+ /**
+ * Searches the streamed iterator for the next row that has at least one match in hashtable.
+ *
+ * @return true if the search is successful, and false the streamed iterator runs out of
+ * tuples.
+ */
+ private final def fetchNext(): Boolean = {
+ currentHashMatched = false
+ while (!currentHashMatched && streamIter.hasNext) {
+ currentStreamedRow = streamIter.next()
+ if (!joinKeys(currentStreamedRow).anyNull) {
+ currentHashMatched = true
+ }
+ }
+ currentHashMatched
+ }
+ }
+ }
+ }
+}
+
+/**
+ * :: DeveloperApi ::
+ */
+@DeveloperApi
+case class LeftSemiJoinBNL(
+ streamed: SparkPlan, broadcast: SparkPlan, condition: Option[Expression])
+ (@transient sc: SparkContext)
+ extends BinaryNode {
+ // TODO: Override requiredChildDistribution.
+
+ override def outputPartitioning: Partitioning = streamed.outputPartitioning
+
+ override def otherCopyArgs = sc :: Nil
+
+ def output = left.output
+
+ /** The Streamed Relation */
+ def left = streamed
+ /** The Broadcast relation */
+ def right = broadcast
+
+ @transient lazy val boundCondition =
+ InterpretedPredicate(
+ condition
+ .map(c => BindReferences.bindReference(c, left.output ++ right.output))
+ .getOrElse(Literal(true)))
+
+
+ def execute() = {
+ val broadcastedRelation = sc.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq)
+
+ val streamedPlusMatches = streamed.execute().mapPartitions { streamedIter =>
+ val joinedRow = new JoinedRow
+
+ streamedIter.filter(streamedRow => {
+ var i = 0
+ var matched = false
+
+ while (i < broadcastedRelation.value.size && !matched) {
+ // TODO: One bitset per partition instead of per row.
--- End diff --
Is this comment stale?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: add support for left semi join
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-43727534
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15115/
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...
Posted by adrian-wang <gi...@git.apache.org>.
Github user adrian-wang commented on a diff in the pull request:
https://github.com/apache/spark/pull/837#discussion_r13522951
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
@@ -119,6 +119,11 @@ object HashFilteredJoin extends Logging with PredicateHelper {
case FilteredOperation(predicates, join @ Join(left, right, Inner, condition)) =>
logger.debug(s"Considering hash inner join on: ${predicates ++ condition}")
splitPredicates(predicates ++ condition, join)
+ // All predicates can be evaluated for left semi join (those that are in the WHERE
+ // clause can only from left table, so they can all be pushed down.)
--- End diff --
Yes, I think LEFT SEMI JOIN would not suffer by pushing down predicates.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: add support for left semi join
Posted by adrian-wang <gi...@git.apache.org>.
Github user adrian-wang commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-43605087
This is a solution with #418 from @marmbrus .
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: add support for left semi join
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-43727343
Merged build started.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-45429745
Merged build triggered.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: add support for left semi join
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/837#issuecomment-43727320
Merged build triggered.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---