You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by davies <gi...@git.apache.org> on 2016/02/23 22:02:17 UTC

[GitHub] spark pull request: [SPARK-12313] improve performance of Broadcast...

GitHub user davies opened a pull request:

    https://github.com/apache/spark/pull/11328

    [SPARK-12313] improve performance of BroadcastNestedLoopJoin

    ## What changes were proposed in this pull request?
    
    Currently, BroadcastNestedLoopJoin is implemented for worst case, it's too slow, very easy to hang forever. This PR will create fast path for some joinType and buildSide, also improve the worst case (will use much less memory than before).
    
    ## How was the this patch tested?
    
    existing unit tests.
    
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/davies/spark nested_loop

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/11328.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #11328
    
----
commit ff875d152fc64dfa1c55ab7bc45208d61bb1d5d4
Author: Davies Liu <da...@databricks.com>
Date:   2016-02-23T20:58:34Z

    improve BroadcastNestedLoopJoin

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-188046622
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-187952149
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/51796/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-187951631
  
    **[Test build #51796 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/51796/consoleFull)** for PR 11328 at commit [`ff875d1`](https://github.com/apache/spark/commit/ff875d152fc64dfa1c55ab7bc45208d61bb1d5d4).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-189127122
  
    **[Test build #52026 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/52026/consoleFull)** for PR 11328 at commit [`6fe9768`](https://github.com/apache/spark/commit/6fe97686202350b67d3553544af0f515333628a3).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-189147010
  
    **[Test build #52026 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/52026/consoleFull)** for PR 11328 at commit [`6fe9768`](https://github.com/apache/spark/commit/6fe97686202350b67d3553544af0f515333628a3).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-188091391
  
    Can you update the description to include more information? For example, what join  types are being sped up, and how you avoid hanging. Thanks.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/11328


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-188123474
  
    Thanks for the update!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-189345508
  
    **[Test build #2586 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2586/consoleFull)** for PR 11328 at commit [`6fe9768`](https://github.com/apache/spark/commit/6fe97686202350b67d3553544af0f515333628a3).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-189147153
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11328#discussion_r54207667
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala ---
    @@ -51,125 +51,254 @@ case class BroadcastNestedLoopJoin(
       }
     
       private[this] def genResultProjection: InternalRow => InternalRow = {
    -    UnsafeProjection.create(schema)
    +    if (joinType == LeftSemi) {
    +      UnsafeProjection.create(output, output)
    +    } else {
    +      // Always put the stream side on left to simplify implementation
    +      UnsafeProjection.create(output, streamed.output ++ broadcast.output)
    +    }
       }
     
       override def outputPartitioning: Partitioning = streamed.outputPartitioning
     
       override def output: Seq[Attribute] = {
         joinType match {
    +      case Inner =>
    +        left.output ++ right.output
           case LeftOuter =>
             left.output ++ right.output.map(_.withNullability(true))
           case RightOuter =>
             left.output.map(_.withNullability(true)) ++ right.output
           case FullOuter =>
             left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
    -      case Inner =>
    -        // TODO we can avoid breaking the lineage, since we union an empty RDD for Inner Join case
    -        left.output ++ right.output
    -      case x => // TODO support the Left Semi Join
    +      case LeftSemi =>
    +        left.output
    +      case x =>
             throw new IllegalArgumentException(
               s"BroadcastNestedLoopJoin should not take $x as the JoinType")
         }
       }
     
    -  @transient private lazy val boundCondition =
    -    newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
    +  @transient private lazy val boundCondition = {
    +    if (condition.isDefined) {
    +      newPredicate(condition.get, streamed.output ++ broadcast.output)
    +    } else {
    +      (r: InternalRow) => true
    +    }
    +  }
     
    -  protected override def doExecute(): RDD[InternalRow] = {
    -    val numOutputRows = longMetric("numOutputRows")
    +  private def innerJoin(relation: Broadcast[Array[InternalRow]]): RDD[InternalRow] = {
    +    streamed.execute().mapPartitionsInternal { streamedIter =>
    +      val buildRows = relation.value
    +      val joinedRow = new JoinedRow
     
    -    val broadcastedRelation = broadcast.executeBroadcast[Array[InternalRow]]()
    +      streamedIter.flatMap { streamedRow =>
    +        val joinedRows = buildRows.iterator.map(r => joinedRow(streamedRow, r))
    +        if (condition.isDefined) {
    +          joinedRows.filter(boundCondition)
    +        } else {
    +          joinedRows
    +        }
    +      }
    +    }
    +  }
     
    -    /** All rows that either match both-way, or rows from streamed joined with nulls. */
    -    val matchesOrStreamedRowsWithNulls = streamed.execute().mapPartitions { streamedIter =>
    -      val relation = broadcastedRelation.value
    +  private def outerJoin(relation: Broadcast[Array[InternalRow]]): RDD[InternalRow] = {
    +    streamed.execute().mapPartitionsInternal { streamedIter =>
    +      val buildRows = relation.value
    +      val joinedRow = new JoinedRow
    +      val nulls = new GenericMutableRow(broadcast.output.size)
    +
    +      // Returns an iterator to avoid copy the rows.
    +      new Iterator[InternalRow] {
    +        // current row from stream side
    +        private var streamRow: InternalRow = null
    +        // have found a match for current row or not
    +        private var foundMatch: Boolean = false
    +        // the matched result row
    +        private var resultRow: InternalRow = null
    +        // the next index of buildRows to try
    +        private var nextIndex: Int = 0
     
    -      val matchedRows = new CompactBuffer[InternalRow]
    -      val includedBroadcastTuples = new BitSet(relation.length)
    +        private def findNextMatch(): Boolean = {
    +          if (streamRow == null) {
    +            if (!streamedIter.hasNext) {
    +              return false
    +            }
    +            streamRow = streamedIter.next()
    +            nextIndex = 0
    +            foundMatch = false
    +          }
    +          while (nextIndex < buildRows.length) {
    +            resultRow = joinedRow(streamRow, buildRows(nextIndex))
    +            nextIndex += 1
    +            if (boundCondition(resultRow)) {
    +              foundMatch = true
    +              return true
    +            }
    +          }
    +          if (!foundMatch) {
    +            resultRow = joinedRow(streamRow, nulls)
    +            streamRow = null
    +            true
    +          } else {
    +            resultRow = null
    +            streamRow = null
    +            findNextMatch()
    --- End diff --
    
    Recursive make it easy to understand, it has most two level (when the streamed row had loop over all the matches, the  next streamed row will find a match or nulls)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-187960303
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-187995839
  
    **[Test build #2574 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2574/consoleFull)** for PR 11328 at commit [`9fd01be`](https://github.com/apache/spark/commit/9fd01beefeefa75fa396bb045e7f835178c5d7bc).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-188101079
  
    @rxin Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11328#discussion_r54207470
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala ---
    @@ -51,125 +51,254 @@ case class BroadcastNestedLoopJoin(
       }
     
       private[this] def genResultProjection: InternalRow => InternalRow = {
    -    UnsafeProjection.create(schema)
    +    if (joinType == LeftSemi) {
    +      UnsafeProjection.create(output, output)
    +    } else {
    +      // Always put the stream side on left to simplify implementation
    +      UnsafeProjection.create(output, streamed.output ++ broadcast.output)
    +    }
       }
     
       override def outputPartitioning: Partitioning = streamed.outputPartitioning
     
       override def output: Seq[Attribute] = {
         joinType match {
    +      case Inner =>
    +        left.output ++ right.output
           case LeftOuter =>
             left.output ++ right.output.map(_.withNullability(true))
           case RightOuter =>
             left.output.map(_.withNullability(true)) ++ right.output
           case FullOuter =>
             left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
    -      case Inner =>
    -        // TODO we can avoid breaking the lineage, since we union an empty RDD for Inner Join case
    -        left.output ++ right.output
    -      case x => // TODO support the Left Semi Join
    +      case LeftSemi =>
    +        left.output
    +      case x =>
             throw new IllegalArgumentException(
               s"BroadcastNestedLoopJoin should not take $x as the JoinType")
         }
       }
     
    -  @transient private lazy val boundCondition =
    -    newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
    +  @transient private lazy val boundCondition = {
    +    if (condition.isDefined) {
    +      newPredicate(condition.get, streamed.output ++ broadcast.output)
    +    } else {
    +      (r: InternalRow) => true
    +    }
    +  }
     
    -  protected override def doExecute(): RDD[InternalRow] = {
    -    val numOutputRows = longMetric("numOutputRows")
    +  private def innerJoin(relation: Broadcast[Array[InternalRow]]): RDD[InternalRow] = {
    +    streamed.execute().mapPartitionsInternal { streamedIter =>
    +      val buildRows = relation.value
    +      val joinedRow = new JoinedRow
     
    -    val broadcastedRelation = broadcast.executeBroadcast[Array[InternalRow]]()
    +      streamedIter.flatMap { streamedRow =>
    +        val joinedRows = buildRows.iterator.map(r => joinedRow(streamedRow, r))
    +        if (condition.isDefined) {
    +          joinedRows.filter(boundCondition)
    +        } else {
    +          joinedRows
    +        }
    +      }
    +    }
    +  }
     
    -    /** All rows that either match both-way, or rows from streamed joined with nulls. */
    -    val matchesOrStreamedRowsWithNulls = streamed.execute().mapPartitions { streamedIter =>
    -      val relation = broadcastedRelation.value
    +  private def outerJoin(relation: Broadcast[Array[InternalRow]]): RDD[InternalRow] = {
    --- End diff --
    
    Both LeftOuter and RightOuter


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-189396098
  
    Merging this into master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11328#discussion_r53857174
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala ---
    @@ -58,56 +58,173 @@ case class BroadcastNestedLoopJoin(
     
       override def output: Seq[Attribute] = {
         joinType match {
    +      case Inner =>
    +        left.output ++ right.output
           case LeftOuter =>
             left.output ++ right.output.map(_.withNullability(true))
           case RightOuter =>
             left.output.map(_.withNullability(true)) ++ right.output
           case FullOuter =>
             left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
    -      case Inner =>
    -        // TODO we can avoid breaking the lineage, since we union an empty RDD for Inner Join case
    -        left.output ++ right.output
    -      case x => // TODO support the Left Semi Join
    +      case LeftSemi =>
    +        left.output
    +      case x =>
             throw new IllegalArgumentException(
               s"BroadcastNestedLoopJoin should not take $x as the JoinType")
         }
       }
     
    -  @transient private lazy val boundCondition =
    -    newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
    +  @transient private lazy val boundCondition = {
    +    if (condition.isDefined) {
    +      newPredicate(condition.get, left.output ++ right.output)
    +    } else {
    +      (r: InternalRow) => true
    +    }
    +  }
     
    -  protected override def doExecute(): RDD[InternalRow] = {
    -    val numOutputRows = longMetric("numOutputRows")
    +  private def innerJoin(relation: Broadcast[Array[InternalRow]]): RDD[InternalRow] = {
    +    streamed.execute().mapPartitionsInternal { streamedIter =>
    +      val buildRows = relation.value
    +      val joinedRow = new JoinedRow
     
    -    val broadcastedRelation = broadcast.executeBroadcast[Array[InternalRow]]()
    +      streamedIter.flatMap { streamedRow =>
    +        val joinedRows = buildSide match {
    +          case BuildRight =>
    +            buildRows.iterator.map(r => joinedRow(streamedRow, r))
    +          case BuildLeft =>
    +            buildRows.iterator.map(r => joinedRow(r, streamedRow))
    +        }
    +        if (condition.isDefined) {
    +          joinedRows.filter(boundCondition)
    +        } else {
    +          joinedRows
    +        }
    +      }
    +    }
    +  }
    +
    +  private def leftOuterJoin(relation: Broadcast[Array[InternalRow]]): RDD[InternalRow] = {
    +    assert(buildSide == BuildRight)
    +    streamed.execute().mapPartitionsInternal { streamedIter =>
    +      val buildRows = relation.value
    +      val joinedRow = new JoinedRow
    +
    +      streamedIter.flatMap { streamedRow =>
    +        val joinedRows = buildRows.iterator.map(r => joinedRow(streamedRow, r))
    +        if (condition.isDefined) {
    +          joinedRows.filter(boundCondition)
    --- End diff --
    
    Why is this right? If it fails on the join condition, it should put null on the other side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-187954838
  
    I'm surprised that the bug was not catched by tests, will add tests for that. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-189389691
  
    **[Test build #2586 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2586/consoleFull)** for PR 11328 at commit [`6fe9768`](https://github.com/apache/spark/commit/6fe97686202350b67d3553544af0f515333628a3).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-187960307
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/51809/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-187961999
  
    **[Test build #2574 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2574/consoleFull)** for PR 11328 at commit [`9fd01be`](https://github.com/apache/spark/commit/9fd01beefeefa75fa396bb045e7f835178c5d7bc).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-189027628
  
    LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11328#discussion_r54163152
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala ---
    @@ -51,125 +51,254 @@ case class BroadcastNestedLoopJoin(
       }
     
       private[this] def genResultProjection: InternalRow => InternalRow = {
    -    UnsafeProjection.create(schema)
    +    if (joinType == LeftSemi) {
    +      UnsafeProjection.create(output, output)
    +    } else {
    +      // Always put the stream side on left to simplify implementation
    +      UnsafeProjection.create(output, streamed.output ++ broadcast.output)
    +    }
       }
     
       override def outputPartitioning: Partitioning = streamed.outputPartitioning
     
       override def output: Seq[Attribute] = {
         joinType match {
    +      case Inner =>
    +        left.output ++ right.output
           case LeftOuter =>
             left.output ++ right.output.map(_.withNullability(true))
           case RightOuter =>
             left.output.map(_.withNullability(true)) ++ right.output
           case FullOuter =>
             left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
    -      case Inner =>
    -        // TODO we can avoid breaking the lineage, since we union an empty RDD for Inner Join case
    -        left.output ++ right.output
    -      case x => // TODO support the Left Semi Join
    +      case LeftSemi =>
    +        left.output
    +      case x =>
             throw new IllegalArgumentException(
               s"BroadcastNestedLoopJoin should not take $x as the JoinType")
         }
       }
     
    -  @transient private lazy val boundCondition =
    -    newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
    +  @transient private lazy val boundCondition = {
    +    if (condition.isDefined) {
    +      newPredicate(condition.get, streamed.output ++ broadcast.output)
    +    } else {
    +      (r: InternalRow) => true
    +    }
    +  }
     
    -  protected override def doExecute(): RDD[InternalRow] = {
    -    val numOutputRows = longMetric("numOutputRows")
    +  private def innerJoin(relation: Broadcast[Array[InternalRow]]): RDD[InternalRow] = {
    +    streamed.execute().mapPartitionsInternal { streamedIter =>
    +      val buildRows = relation.value
    +      val joinedRow = new JoinedRow
     
    -    val broadcastedRelation = broadcast.executeBroadcast[Array[InternalRow]]()
    +      streamedIter.flatMap { streamedRow =>
    +        val joinedRows = buildRows.iterator.map(r => joinedRow(streamedRow, r))
    +        if (condition.isDefined) {
    +          joinedRows.filter(boundCondition)
    +        } else {
    +          joinedRows
    +        }
    +      }
    +    }
    +  }
     
    -    /** All rows that either match both-way, or rows from streamed joined with nulls. */
    -    val matchesOrStreamedRowsWithNulls = streamed.execute().mapPartitions { streamedIter =>
    -      val relation = broadcastedRelation.value
    +  private def outerJoin(relation: Broadcast[Array[InternalRow]]): RDD[InternalRow] = {
    +    streamed.execute().mapPartitionsInternal { streamedIter =>
    +      val buildRows = relation.value
    +      val joinedRow = new JoinedRow
    +      val nulls = new GenericMutableRow(broadcast.output.size)
    +
    +      // Returns an iterator to avoid copy the rows.
    +      new Iterator[InternalRow] {
    +        // current row from stream side
    +        private var streamRow: InternalRow = null
    +        // have found a match for current row or not
    +        private var foundMatch: Boolean = false
    +        // the matched result row
    +        private var resultRow: InternalRow = null
    +        // the next index of buildRows to try
    +        private var nextIndex: Int = 0
     
    -      val matchedRows = new CompactBuffer[InternalRow]
    -      val includedBroadcastTuples = new BitSet(relation.length)
    +        private def findNextMatch(): Boolean = {
    +          if (streamRow == null) {
    +            if (!streamedIter.hasNext) {
    +              return false
    +            }
    +            streamRow = streamedIter.next()
    +            nextIndex = 0
    +            foundMatch = false
    +          }
    +          while (nextIndex < buildRows.length) {
    +            resultRow = joinedRow(streamRow, buildRows(nextIndex))
    +            nextIndex += 1
    +            if (boundCondition(resultRow)) {
    +              foundMatch = true
    +              return true
    +            }
    +          }
    +          if (!foundMatch) {
    +            resultRow = joinedRow(streamRow, nulls)
    +            streamRow = null
    +            true
    +          } else {
    +            resultRow = null
    +            streamRow = null
    +            findNextMatch()
    --- End diff --
    
    i don't think this should be recursive. How about a loop?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-188013417
  
    **[Test build #51833 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/51833/consoleFull)** for PR 11328 at commit [`e94ac50`](https://github.com/apache/spark/commit/e94ac50df1ca381e9ba263378f0cfba63cab3d16).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-188041010
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/51811/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-188041006
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-188046131
  
    **[Test build #51833 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/51833/consoleFull)** for PR 11328 at commit [`e94ac50`](https://github.com/apache/spark/commit/e94ac50df1ca381e9ba263378f0cfba63cab3d16).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11328#discussion_r53858161
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala ---
    @@ -58,56 +58,173 @@ case class BroadcastNestedLoopJoin(
     
       override def output: Seq[Attribute] = {
         joinType match {
    +      case Inner =>
    +        left.output ++ right.output
           case LeftOuter =>
             left.output ++ right.output.map(_.withNullability(true))
           case RightOuter =>
             left.output.map(_.withNullability(true)) ++ right.output
           case FullOuter =>
             left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
    -      case Inner =>
    -        // TODO we can avoid breaking the lineage, since we union an empty RDD for Inner Join case
    -        left.output ++ right.output
    -      case x => // TODO support the Left Semi Join
    +      case LeftSemi =>
    +        left.output
    +      case x =>
             throw new IllegalArgumentException(
               s"BroadcastNestedLoopJoin should not take $x as the JoinType")
         }
       }
     
    -  @transient private lazy val boundCondition =
    -    newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
    +  @transient private lazy val boundCondition = {
    +    if (condition.isDefined) {
    +      newPredicate(condition.get, left.output ++ right.output)
    +    } else {
    +      (r: InternalRow) => true
    +    }
    +  }
     
    -  protected override def doExecute(): RDD[InternalRow] = {
    -    val numOutputRows = longMetric("numOutputRows")
    +  private def innerJoin(relation: Broadcast[Array[InternalRow]]): RDD[InternalRow] = {
    +    streamed.execute().mapPartitionsInternal { streamedIter =>
    +      val buildRows = relation.value
    +      val joinedRow = new JoinedRow
     
    -    val broadcastedRelation = broadcast.executeBroadcast[Array[InternalRow]]()
    +      streamedIter.flatMap { streamedRow =>
    +        val joinedRows = buildSide match {
    +          case BuildRight =>
    +            buildRows.iterator.map(r => joinedRow(streamedRow, r))
    +          case BuildLeft =>
    +            buildRows.iterator.map(r => joinedRow(r, streamedRow))
    +        }
    +        if (condition.isDefined) {
    +          joinedRows.filter(boundCondition)
    +        } else {
    +          joinedRows
    +        }
    +      }
    +    }
    +  }
    +
    +  private def leftOuterJoin(relation: Broadcast[Array[InternalRow]]): RDD[InternalRow] = {
    +    assert(buildSide == BuildRight)
    +    streamed.execute().mapPartitionsInternal { streamedIter =>
    +      val buildRows = relation.value
    +      val joinedRow = new JoinedRow
    +
    +      streamedIter.flatMap { streamedRow =>
    +        val joinedRows = buildRows.iterator.map(r => joinedRow(streamedRow, r))
    +        if (condition.isDefined) {
    +          joinedRows.filter(boundCondition)
    --- End diff --
    
    Missed that, will fix it, same for RightOuter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-188046627
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/51833/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-187915906
  
    **[Test build #51796 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/51796/consoleFull)** for PR 11328 at commit [`ff875d1`](https://github.com/apache/spark/commit/ff875d152fc64dfa1c55ab7bc45208d61bb1d5d4).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11328#discussion_r54163178
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala ---
    @@ -51,125 +51,254 @@ case class BroadcastNestedLoopJoin(
       }
     
       private[this] def genResultProjection: InternalRow => InternalRow = {
    -    UnsafeProjection.create(schema)
    +    if (joinType == LeftSemi) {
    +      UnsafeProjection.create(output, output)
    +    } else {
    +      // Always put the stream side on left to simplify implementation
    +      UnsafeProjection.create(output, streamed.output ++ broadcast.output)
    +    }
       }
     
       override def outputPartitioning: Partitioning = streamed.outputPartitioning
     
       override def output: Seq[Attribute] = {
         joinType match {
    +      case Inner =>
    +        left.output ++ right.output
           case LeftOuter =>
             left.output ++ right.output.map(_.withNullability(true))
           case RightOuter =>
             left.output.map(_.withNullability(true)) ++ right.output
           case FullOuter =>
             left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
    -      case Inner =>
    -        // TODO we can avoid breaking the lineage, since we union an empty RDD for Inner Join case
    -        left.output ++ right.output
    -      case x => // TODO support the Left Semi Join
    +      case LeftSemi =>
    +        left.output
    +      case x =>
             throw new IllegalArgumentException(
               s"BroadcastNestedLoopJoin should not take $x as the JoinType")
         }
       }
     
    -  @transient private lazy val boundCondition =
    -    newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
    +  @transient private lazy val boundCondition = {
    +    if (condition.isDefined) {
    +      newPredicate(condition.get, streamed.output ++ broadcast.output)
    +    } else {
    +      (r: InternalRow) => true
    +    }
    +  }
     
    -  protected override def doExecute(): RDD[InternalRow] = {
    -    val numOutputRows = longMetric("numOutputRows")
    +  private def innerJoin(relation: Broadcast[Array[InternalRow]]): RDD[InternalRow] = {
    +    streamed.execute().mapPartitionsInternal { streamedIter =>
    +      val buildRows = relation.value
    +      val joinedRow = new JoinedRow
     
    -    val broadcastedRelation = broadcast.executeBroadcast[Array[InternalRow]]()
    +      streamedIter.flatMap { streamedRow =>
    +        val joinedRows = buildRows.iterator.map(r => joinedRow(streamedRow, r))
    +        if (condition.isDefined) {
    +          joinedRows.filter(boundCondition)
    +        } else {
    +          joinedRows
    +        }
    +      }
    +    }
    +  }
     
    -    /** All rows that either match both-way, or rows from streamed joined with nulls. */
    -    val matchesOrStreamedRowsWithNulls = streamed.execute().mapPartitions { streamedIter =>
    -      val relation = broadcastedRelation.value
    +  private def outerJoin(relation: Broadcast[Array[InternalRow]]): RDD[InternalRow] = {
    --- End diff --
    
    this is leftOuter yes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-189147155
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/52026/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-187957481
  
    **[Test build #51811 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/51811/consoleFull)** for PR 11328 at commit [`9fd01be`](https://github.com/apache/spark/commit/9fd01beefeefa75fa396bb045e7f835178c5d7bc).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-188040383
  
    **[Test build #51811 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/51811/consoleFull)** for PR 11328 at commit [`9fd01be`](https://github.com/apache/spark/commit/9fd01beefeefa75fa396bb045e7f835178c5d7bc).
     * This patch **fails from timeout after a configured wait of \`250m\`**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11328#issuecomment-187952146
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12313] [SQL] improve performance of Bro...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11328#discussion_r54163596
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala ---
    @@ -51,125 +51,254 @@ case class BroadcastNestedLoopJoin(
       }
     
       private[this] def genResultProjection: InternalRow => InternalRow = {
    -    UnsafeProjection.create(schema)
    +    if (joinType == LeftSemi) {
    +      UnsafeProjection.create(output, output)
    +    } else {
    +      // Always put the stream side on left to simplify implementation
    +      UnsafeProjection.create(output, streamed.output ++ broadcast.output)
    +    }
       }
     
       override def outputPartitioning: Partitioning = streamed.outputPartitioning
     
       override def output: Seq[Attribute] = {
         joinType match {
    +      case Inner =>
    +        left.output ++ right.output
           case LeftOuter =>
             left.output ++ right.output.map(_.withNullability(true))
           case RightOuter =>
             left.output.map(_.withNullability(true)) ++ right.output
           case FullOuter =>
             left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
    -      case Inner =>
    -        // TODO we can avoid breaking the lineage, since we union an empty RDD for Inner Join case
    -        left.output ++ right.output
    -      case x => // TODO support the Left Semi Join
    +      case LeftSemi =>
    +        left.output
    +      case x =>
             throw new IllegalArgumentException(
               s"BroadcastNestedLoopJoin should not take $x as the JoinType")
         }
       }
     
    -  @transient private lazy val boundCondition =
    -    newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
    +  @transient private lazy val boundCondition = {
    +    if (condition.isDefined) {
    +      newPredicate(condition.get, streamed.output ++ broadcast.output)
    +    } else {
    +      (r: InternalRow) => true
    +    }
    +  }
     
    -  protected override def doExecute(): RDD[InternalRow] = {
    -    val numOutputRows = longMetric("numOutputRows")
    +  private def innerJoin(relation: Broadcast[Array[InternalRow]]): RDD[InternalRow] = {
    +    streamed.execute().mapPartitionsInternal { streamedIter =>
    +      val buildRows = relation.value
    +      val joinedRow = new JoinedRow
     
    -    val broadcastedRelation = broadcast.executeBroadcast[Array[InternalRow]]()
    +      streamedIter.flatMap { streamedRow =>
    +        val joinedRows = buildRows.iterator.map(r => joinedRow(streamedRow, r))
    +        if (condition.isDefined) {
    +          joinedRows.filter(boundCondition)
    +        } else {
    +          joinedRows
    +        }
    +      }
    +    }
    +  }
     
    -    /** All rows that either match both-way, or rows from streamed joined with nulls. */
    -    val matchesOrStreamedRowsWithNulls = streamed.execute().mapPartitions { streamedIter =>
    -      val relation = broadcastedRelation.value
    +  private def outerJoin(relation: Broadcast[Array[InternalRow]]): RDD[InternalRow] = {
    +    streamed.execute().mapPartitionsInternal { streamedIter =>
    +      val buildRows = relation.value
    +      val joinedRow = new JoinedRow
    +      val nulls = new GenericMutableRow(broadcast.output.size)
    +
    +      // Returns an iterator to avoid copy the rows.
    +      new Iterator[InternalRow] {
    +        // current row from stream side
    +        private var streamRow: InternalRow = null
    +        // have found a match for current row or not
    +        private var foundMatch: Boolean = false
    +        // the matched result row
    +        private var resultRow: InternalRow = null
    +        // the next index of buildRows to try
    +        private var nextIndex: Int = 0
     
    -      val matchedRows = new CompactBuffer[InternalRow]
    -      val includedBroadcastTuples = new BitSet(relation.length)
    +        private def findNextMatch(): Boolean = {
    +          if (streamRow == null) {
    +            if (!streamedIter.hasNext) {
    +              return false
    +            }
    +            streamRow = streamedIter.next()
    +            nextIndex = 0
    +            foundMatch = false
    +          }
    +          while (nextIndex < buildRows.length) {
    +            resultRow = joinedRow(streamRow, buildRows(nextIndex))
    +            nextIndex += 1
    +            if (boundCondition(resultRow)) {
    +              foundMatch = true
    +              return true
    +            }
    +          }
    +          if (!foundMatch) {
    +            resultRow = joinedRow(streamRow, nulls)
    +            streamRow = null
    +            true
    +          } else {
    +            resultRow = null
    +            streamRow = null
    +            findNextMatch()
    +          }
    +        }
    +
    +        override def hasNext(): Boolean = {
    +          resultRow != null || findNextMatch()
    +        }
    +        override def next(): InternalRow = {
    +          val r = resultRow
    +          resultRow = null
    +          r
    +        }
    +      }
    +    }
    +  }
    +
    +  private def leftSemiJoin(relation: Broadcast[Array[InternalRow]]): RDD[InternalRow] = {
    +    assert(buildSide == BuildRight)
    +    streamed.execute().mapPartitionsInternal { streamedIter =>
    +      val buildRows = relation.value
           val joinedRow = new JoinedRow
     
    -      val leftNulls = new GenericMutableRow(left.output.size)
    -      val rightNulls = new GenericMutableRow(right.output.size)
    -      val resultProj = genResultProjection
    +      if (condition.isDefined) {
    +        streamedIter.filter(l =>
    +          buildRows.exists(r => boundCondition(joinedRow(l, r)))
    +        )
    +      } else {
    +        streamedIter.filter(r => !buildRows.isEmpty)
    +      }
    +    }
    +  }
    +
    +  /**
    +   * The implementation these joins:
    --- End diff --
    
    "for these joins"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org