You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by sujithjay <gi...@git.apache.org> on 2018/08/21 08:48:28 UTC

[GitHub] spark pull request #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Joi...

GitHub user sujithjay opened a pull request:

    https://github.com/apache/spark/pull/22168

    [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Join in case of data skew

    ## What issue does this pull request address ?
    JIRA: [https://issues.apache.org/jira/browse/SPARK-24985](https://issues.apache.org/jira/browse/SPARK-24985)
    In the case of Full Outer Joins of large tables, in the presence of data skew around the join keys for either of the joined tables, OOMs exceptions occur. While its possible to increase the heap size to workaround, Spark should be resilient to such issues as skews can happen arbitrarily.
    
    ## What changes were proposed in this pull request?
    
    #16909 introduced `ExternalAppendOnlyUnsafeRowArray` & changed `SortMergeJoinExec` to use `ExternalAppendOnlyUnsafeRowArray` for every join, except 'Full Outer Join'. This PR makes changes to make 'Full Outer Joins' to use `ExternalAppendOnlyUnsafeRowArray`.
    
    ## How was this patch tested?
    #### Unit testing
    - Changed a test-case in `JoinSuite`.
    - Existing tests in `OuterJoinSuite` were used to verify correctness.
    
    #### Stress testing
    - This is still work in progress. I plan to verify this patch using a production workload.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sujithjay/spark SPARK-24985

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22168.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22168
    
----
commit ce70a4ef4d6410d0a738a5440dd2b7d91c7e4822
Author: sujithjay <su...@...>
Date:   2018-08-21T08:20:48Z

    [SPARK-24985][SQL] Fix OOM in Full Outer Join in presence of data skew.
    
    Change SortMergeJoinExec to use ExternalAppendOnlyUnsafeRowArray for Full Outer Join. This would spill data into disk if the buffered rows exceed beyond a threshold, thus preventing OOM errors.
    
    Change corresponding test case in JoinSuite.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Joi...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22168#discussion_r211609440
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala ---
    @@ -1028,23 +1034,23 @@ private class SortMergeFullOuterJoinScanner(
         rightIndex = 0
     
         while (leftRowKey != null && keyOrdering.compare(leftRowKey, matchingKey) == 0) {
    -      leftMatches += leftRow.copy()
    +      leftMatches.add(leftRow.copy().asInstanceOf[UnsafeRow])
    --- End diff --
    
    `ExternalAppendOnlyUnsafeRowArray` will handle copy, don't need to do another copy here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Joi...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22168#discussion_r211619140
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala ---
    @@ -1058,31 +1064,37 @@ private class SortMergeFullOuterJoinScanner(
        * @return true if a valid match is found, false otherwise.
        */
       private def scanNextInBuffered(): Boolean = {
    -    while (leftIndex < leftMatches.size) {
    -      while (rightIndex < rightMatches.size) {
    -        joinedRow(leftMatches(leftIndex), rightMatches(rightIndex))
    -        if (boundCondition(joinedRow)) {
    -          leftMatched.set(leftIndex)
    -          rightMatched.set(rightIndex)
    +    val leftMatchesIterator = leftMatches.generateIterator(leftIndex)
    +
    +    while (leftMatchesIterator.hasNext) {
    +      val leftCurRow = leftMatchesIterator.next()
    +      val rightMatchesIterator = rightMatches.generateIterator(rightIndex)
    --- End diff --
    
    Can we keep the scanning left and right iterators? Because if they are spilled, obtaining the iterator from spilled data needs to loop over spill writers and create readers. We may avoid calling `generateIterator` every time for obtaining the iterators. However it might make the code a bit complicated than now.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Join in ca...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22168
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Joi...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22168#discussion_r211607297
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala ---
    @@ -975,8 +979,10 @@ private class SortMergeFullOuterJoinScanner(
     
       private[this] var leftIndex: Int = 0
       private[this] var rightIndex: Int = 0
    -  private[this] val leftMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow]
    -  private[this] val rightMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow]
    --- End diff --
    
    We can remove `scala.collection.mutable.ArrayBuffer` import now. Seems no other places use it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Join in ca...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22168
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Join in ca...

Posted by sujithjay <gi...@git.apache.org>.
Github user sujithjay commented on the issue:

    https://github.com/apache/spark/pull/22168
  
    Hi @tejasapatil, @viirya, @hvanhovell,  & @kiszk, can you please review this pull request?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Joi...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22168#discussion_r211577003
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala ---
    @@ -1099,7 +1111,7 @@ private class SortMergeFullOuterJoinScanner(
     
       def advanceNext(): Boolean = {
         // If we already buffered some matching rows, use them directly
    -    if (leftIndex <= leftMatches.size || rightIndex <= rightMatches.size) {
    +    if (leftIndex <= leftMatches.length || rightIndex <= rightMatches.length) {
    --- End diff --
    
    Why did you change size -> length?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Join in ca...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22168
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Joi...

Posted by sujithjay <gi...@git.apache.org>.
Github user sujithjay commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22168#discussion_r236992453
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala ---
    @@ -1058,31 +1064,37 @@ private class SortMergeFullOuterJoinScanner(
        * @return true if a valid match is found, false otherwise.
        */
       private def scanNextInBuffered(): Boolean = {
    -    while (leftIndex < leftMatches.size) {
    -      while (rightIndex < rightMatches.size) {
    -        joinedRow(leftMatches(leftIndex), rightMatches(rightIndex))
    -        if (boundCondition(joinedRow)) {
    -          leftMatched.set(leftIndex)
    -          rightMatched.set(rightIndex)
    +    val leftMatchesIterator = leftMatches.generateIterator(leftIndex)
    +
    +    while (leftMatchesIterator.hasNext) {
    +      val leftCurRow = leftMatchesIterator.next()
    +      val rightMatchesIterator = rightMatches.generateIterator(rightIndex)
    --- End diff --
    
    Hi @viirya ,
    After some deliberation, I figure it would not be possible to avoid the reinitialisation of the right iterator. Please share your thought on this. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Joi...

Posted by sujithjay <gi...@git.apache.org>.
Github user sujithjay commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22168#discussion_r211695230
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala ---
    @@ -1058,31 +1064,37 @@ private class SortMergeFullOuterJoinScanner(
        * @return true if a valid match is found, false otherwise.
        */
       private def scanNextInBuffered(): Boolean = {
    -    while (leftIndex < leftMatches.size) {
    -      while (rightIndex < rightMatches.size) {
    -        joinedRow(leftMatches(leftIndex), rightMatches(rightIndex))
    -        if (boundCondition(joinedRow)) {
    -          leftMatched.set(leftIndex)
    -          rightMatched.set(rightIndex)
    +    val leftMatchesIterator = leftMatches.generateIterator(leftIndex)
    +
    +    while (leftMatchesIterator.hasNext) {
    +      val leftCurRow = leftMatchesIterator.next()
    +      val rightMatchesIterator = rightMatches.generateIterator(rightIndex)
    --- End diff --
    
    Hi @viirya, 
    Thank you for reviewing the code. 
    
    I agree the code will be a bit complicated if we had to keep scanning the iterators. In this particular case, I was following a pattern observed throughout the rest of the class.
    For eg., https://github.com/apache/spark/blob/35f7f5ce83984d8afe0b7955942baa04f2bef74f/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L307-L329
    
    Having said that, I do feel the penalty for following a similar approach in case of full outer joins could be higher. I will try & see what I can do.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Joi...

Posted by sujithjay <gi...@git.apache.org>.
Github user sujithjay commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22168#discussion_r211579406
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala ---
    @@ -1099,7 +1111,7 @@ private class SortMergeFullOuterJoinScanner(
     
       def advanceNext(): Boolean = {
         // If we already buffered some matching rows, use them directly
    -    if (leftIndex <= leftMatches.size || rightIndex <= rightMatches.size) {
    +    if (leftIndex <= leftMatches.length || rightIndex <= rightMatches.length) {
    --- End diff --
    
    I changed the type of `leftMatches` & `rightMatches` from `ArrayBuffer[InternalRow]` to `ExternalAppendOnlyUnsafeRowArray`. `ExternalAppendOnlyUnsafeRowArray` exposes a `length` method, instead of `size` method.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org