You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by adrian-wang <gi...@git.apache.org> on 2014/04/12 00:01:12 UTC

[GitHub] spark pull request: support leftsemijoin for sparkSQL

GitHub user adrian-wang opened a pull request:

    https://github.com/apache/spark/pull/395

    support leftsemijoin for sparkSQL

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/adrian-wang/spark leftsemijoin

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/395.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #395
    
----
commit f0e1815cef8954f846c1eb6195fc8a6c9f931ce0
Author: Daoyuan <da...@intel.com>
Date:   2014-04-11T21:59:37Z

    support leftsemijoin for sparkSQL

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: support leftsemijoin for sparkSQL

Posted by adrian-wang <gi...@git.apache.org>.
Github user adrian-wang commented on the pull request:

    https://github.com/apache/spark/pull/395#issuecomment-40438109
  
    Thanks for your comments! Here's SPARK-1495[https://issues.apache.org/jira/browse/SPARK-1495].


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: support leftsemijoin for sparkSQL

Posted by adrian-wang <gi...@git.apache.org>.
Github user adrian-wang commented on the pull request:

    https://github.com/apache/spark/pull/395#issuecomment-44897653
  
    Thanks, I have closed this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: support leftsemijoin for sparkSQL

Posted by adrian-wang <gi...@git.apache.org>.
Github user adrian-wang commented on the pull request:

    https://github.com/apache/spark/pull/395#issuecomment-43707969
  
    Just mention it here, I have submitted another solution as #837 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: support leftsemijoin for sparkSQL

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/395#issuecomment-40420386
  
    ok to test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: support leftsemijoin for sparkSQL

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/395#discussion_r11650568
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
    @@ -165,36 +165,64 @@ case class BroadcastNestedLoopJoin(
       def execute() = {
         val broadcastedRelation = sc.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq)
     
    -    val streamedPlusMatches = streamed.execute().mapPartitions { streamedIter =>
    -      val matchedRows = new ArrayBuffer[Row]
    -      // TODO: Use Spark's BitSet.
    -      val includedBroadcastTuples = new BitSet(broadcastedRelation.value.size)
    -      val joinedRow = new JoinedRow
    -
    -      streamedIter.foreach { streamedRow =>
    -        var i = 0
    -        var matched = false
    -
    -        while (i < broadcastedRelation.value.size) {
    -          // TODO: One bitset per partition instead of per row.
    -          val broadcastedRow = broadcastedRelation.value(i)
    -          if (boundCondition(joinedRow(streamedRow, broadcastedRow))) {
    -            matchedRows += buildRow(streamedRow ++ broadcastedRow)
    -            matched = true
    -            includedBroadcastTuples += i
    -          }
    -          i += 1
    +    val streamedPlusMatches = joinType match {
    +      case LeftSemi =>
    +        streamed.execute().mapPartitions {
    +          streamedIter =>
    +            val matchedRows = new ArrayBuffer[Row]
    --- End diff --
    
    Since the logic is pretty simple here, it might be better to stream the results though a custom iterator instead of buffering them all in memory.  It would also be good to do this match at a higher level, or even break this out into its own operator so that we don't need to build tuple objects for no reason.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: support leftsemijoin for sparkSQL

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/395#issuecomment-40436753
  
    Thanks for adding this!  It would be great if you could create a JIRA for tracking this new feature.  Also, right now HashJoin is only used for Inner joins, though it would be good to also extend that at some point (though maybe not in this PR).
    
    One design question is which of the following is better:
     - multiple operators that handle different kinds of joins, letting the planner pick the correct one
     - putting the switching logic inside of the operator as is done here
    
    I need to look at this code closer, but will not have time to do that until after we start cutting release candidates for 1.0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: support leftsemijoin for sparkSQL

Posted by adrian-wang <gi...@git.apache.org>.
Github user adrian-wang commented on the pull request:

    https://github.com/apache/spark/pull/395#issuecomment-42795884
  
    I'll switch to a newer branch with #418 to split leftsemi from other joins.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: support leftsemijoin for sparkSQL

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/395#issuecomment-40257801
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: support leftsemijoin for sparkSQL

Posted by adrian-wang <gi...@git.apache.org>.
Github user adrian-wang commented on the pull request:

    https://github.com/apache/spark/pull/395#issuecomment-42620921
  
    Busy with some other issues recently, I'll try fix it this weekend.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: support leftsemijoin for sparkSQL

Posted by adrian-wang <gi...@git.apache.org>.
Github user adrian-wang commented on the pull request:

    https://github.com/apache/spark/pull/395#issuecomment-40436922
  
    I'll create a JIRA soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: support leftsemijoin for sparkSQL

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/395#discussion_r11649983
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
    @@ -165,36 +165,64 @@ case class BroadcastNestedLoopJoin(
       def execute() = {
         val broadcastedRelation = sc.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq)
     
    -    val streamedPlusMatches = streamed.execute().mapPartitions { streamedIter =>
    -      val matchedRows = new ArrayBuffer[Row]
    -      // TODO: Use Spark's BitSet.
    -      val includedBroadcastTuples = new BitSet(broadcastedRelation.value.size)
    -      val joinedRow = new JoinedRow
    -
    -      streamedIter.foreach { streamedRow =>
    -        var i = 0
    -        var matched = false
    -
    -        while (i < broadcastedRelation.value.size) {
    -          // TODO: One bitset per partition instead of per row.
    -          val broadcastedRow = broadcastedRelation.value(i)
    -          if (boundCondition(joinedRow(streamedRow, broadcastedRow))) {
    -            matchedRows += buildRow(streamedRow ++ broadcastedRow)
    -            matched = true
    -            includedBroadcastTuples += i
    -          }
    -          i += 1
    +    val streamedPlusMatches = joinType match {
    +      case LeftSemi =>
    +        streamed.execute().mapPartitions {
    +          streamedIter =>
    +            val matchedRows = new ArrayBuffer[Row]
    +            val joinedRow = new JoinedRow
    +
    +            streamedIter.foreach {
    +              streamedRow =>
    +                var i = 0
    +                var matched = false
    +
    +                while (i < broadcastedRelation.value.size && !matched) {
    +                  // TODO: One bitset per partition instead of per row.
    +                  val broadcastedRow = broadcastedRelation.value(i)
    +                    if (boundCondition(joinedRow(streamedRow, broadcastedRow))) {
    +                      matchedRows += buildRow(streamedRow)
    --- End diff --
    
    There is no need to call `buildRow` here, as you can just use streamedRow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: support leftsemijoin for sparkSQL

Posted by adrian-wang <gi...@git.apache.org>.
Github user adrian-wang closed the pull request at:

    https://github.com/apache/spark/pull/395


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: support leftsemijoin for sparkSQL

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/395#issuecomment-44888442
  
    Mind closing this version if it is subsumed by #837 ?  Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: support leftsemijoin for sparkSQL

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/395#issuecomment-42614695
  
    Just checking in to see if there is anything I can help with here.  Would be cool to have this feature!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: support leftsemijoin for sparkSQL

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on the pull request:

    https://github.com/apache/spark/pull/395#issuecomment-40434377
  
    Besides the BroadcastNestedLoopJoin, I think the left semi join may also need to be implemented in the HashJoin.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: support leftsemijoin for sparkSQL

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/395#discussion_r11650765
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
    @@ -165,36 +165,64 @@ case class BroadcastNestedLoopJoin(
       def execute() = {
         val broadcastedRelation = sc.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq)
     
    -    val streamedPlusMatches = streamed.execute().mapPartitions { streamedIter =>
    -      val matchedRows = new ArrayBuffer[Row]
    -      // TODO: Use Spark's BitSet.
    -      val includedBroadcastTuples = new BitSet(broadcastedRelation.value.size)
    -      val joinedRow = new JoinedRow
    -
    -      streamedIter.foreach { streamedRow =>
    -        var i = 0
    -        var matched = false
    -
    -        while (i < broadcastedRelation.value.size) {
    -          // TODO: One bitset per partition instead of per row.
    -          val broadcastedRow = broadcastedRelation.value(i)
    -          if (boundCondition(joinedRow(streamedRow, broadcastedRow))) {
    -            matchedRows += buildRow(streamedRow ++ broadcastedRow)
    -            matched = true
    -            includedBroadcastTuples += i
    -          }
    -          i += 1
    +    val streamedPlusMatches = joinType match {
    +      case LeftSemi =>
    +        streamed.execute().mapPartitions {
    +          streamedIter =>
    +            val matchedRows = new ArrayBuffer[Row]
    +            val joinedRow = new JoinedRow
    +
    +            streamedIter.foreach {
    +              streamedRow =>
    +                var i = 0
    +                var matched = false
    +
    +                while (i < broadcastedRelation.value.size && !matched) {
    +                  // TODO: One bitset per partition instead of per row.
    +                  val broadcastedRow = broadcastedRelation.value(i)
    +                    if (boundCondition(joinedRow(streamedRow, broadcastedRow))) {
    +                      matchedRows += buildRow(streamedRow)
    +                      matched = true
    +                  }
    +                  i += 1
    +                }
    +            }
    +            Iterator((matchedRows, null))
             }
    -
    -        if (!matched && (joinType == LeftOuter || joinType == FullOuter)) {
    -          matchedRows += buildRow(streamedRow ++ Array.fill(right.output.size)(null))
    +      case _ =>
    +        streamed.execute().mapPartitions {
    +          streamedIter =>
    +            val matchedRows = new ArrayBuffer[Row]
    +            // TODO: Use Spark's BitSet.
    +            val includedBroadcastTuples = new BitSet(broadcastedRelation.value.size)
    +            val joinedRow = new JoinedRow
    +
    +            streamedIter.foreach {
    +              streamedRow =>
    +                var i = 0
    +                var matched = false
    +
    +                while (i < broadcastedRelation.value.size) {
    +                  // TODO: One bitset per partition instead of per row.
    +                  val broadcastedRow = broadcastedRelation.value(i)
    +                  if (boundCondition(joinedRow(streamedRow, broadcastedRow))) {
    +                    matchedRows += buildRow(streamedRow ++ broadcastedRow)
    --- End diff --
    
    Existing: (I realize this was already here.)
    
    I think it would be cheaper to use a new `JoinedRow` here instead of `buildRow`.  `buildRow` is going to allocate a new Array and copy all the values instead of just the values for `streamedRow`.  You will also need to call `copy()` on `streamedRow`.
    
    A similar trick could be done below to avoid creating a new Array of nulls for each output row.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---