You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by davies <gi...@git.apache.org> on 2015/09/02 23:50:48 UTC

[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

GitHub user davies opened a pull request:

    https://github.com/apache/spark/pull/8579

    [SPARK-9730] [SQL] Add Full Outer Join support for SortMergeJoin

    This PR is based on #8383 , thanks to @viirya 
    
    JIRA: https://issues.apache.org/jira/browse/SPARK-9730
    
    This patch adds the Full Outer Join support for SortMergeJoin. A new class SortMergeFullJoinScanner is added to scan rows from left and right iterators. FullOuterIterator is simply a wrapper of type RowIterator to consume joined rows from SortMergeFullJoinScanner.
    
    Closes #8383 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/davies/spark smj_fullouter

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/8579.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #8579
    
----
commit 5703d5685ab66742db22f9f6b826fa9cee8f39d5
Author: Liang-Chi Hsieh <vi...@appier.com>
Date:   2015-08-23T17:06:00Z

    Add Full Outer Join support for SortMergeJoin.

commit 3cc1cb810d2a8ac63fb72be48066dd41ed16f9b4
Author: Liang-Chi Hsieh <vi...@appier.com>
Date:   2015-08-24T05:58:33Z

    Merge remote-tracking branch 'upstream/master' into smj-fullouter

commit ddc5f825439dcc480d4b05beca9b9ab844da1797
Author: Liang-Chi Hsieh <vi...@appier.com>
Date:   2015-08-24T06:12:19Z

    Update ExtractEquiJoinKeys for SortMergeJoin.

commit 4a3178b8231825d25d29cdd530f6b55b6775774a
Author: Liang-Chi Hsieh <vi...@appier.com>
Date:   2015-08-24T09:27:47Z

    Fix outputPartitioning and NPE bug. Fix a test.

commit 06e0f74c489e9f0bdb8b889ca8e1802aa4c0a91e
Author: Liang-Chi Hsieh <vi...@appier.com>
Date:   2015-08-24T16:17:24Z

    Can not compare two rows when there are only nulls.

commit 0559a40e0202de5a4886672d422812d1a86d0576
Author: Liang-Chi Hsieh <vi...@appier.com>
Date:   2015-08-25T17:28:18Z

    Fix UnsafeRow.allNull.

commit 27b6044a388aa62eb18baf7b859099c489f0df26
Author: Liang-Chi Hsieh <vi...@appier.com>
Date:   2015-08-26T04:43:27Z

    Should use anyNull instead of allNull.

commit 74a601b3260837ec3c2111e90e5055d9b483472a
Author: Liang-Chi Hsieh <vi...@appier.com>
Date:   2015-09-02T15:23:36Z

    Merge remote-tracking branch 'upstream/master' into smj-fullouter

commit addf5fe04c667d3e57a776eebcddf0b82de69b3e
Author: Liang-Chi Hsieh <vi...@appier.com>
Date:   2015-09-02T15:26:58Z

    For comment.

commit 8a81df4eac38ec17be81c15a0a0cd51e340c7a04
Author: Davies Liu <da...@databricks.com>
Date:   2015-09-02T21:35:19Z

    refactor

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8579#discussion_r38975823
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala ---
    @@ -271,3 +297,189 @@ private class RightOuterIterator(
     
       override def getRow: InternalRow = resultProj(joinedRow)
     }
    +
    +private class SortMergeFullOuterJoinScanner(
    +    leftKeyGenerator: Projection,
    +    rightKeyGenerator: Projection,
    +    keyOrdering: Ordering[InternalRow],
    +    leftIter: RowIterator,
    +    numLeftRows: LongSQLMetric,
    +    rightIter: RowIterator,
    +    numRightRows: LongSQLMetric,
    +    boundCondition: InternalRow => Boolean,
    +    leftNullRow: InternalRow,
    +    rightNullRow: InternalRow)  {
    +  private[this] val joinedRow: JoinedRow = new JoinedRow()
    +  private[this] var leftRow: InternalRow = _
    +  private[this] var leftRowKey: InternalRow = _
    +  private[this] var rightRow: InternalRow = _
    +  private[this] var rightRowKey: InternalRow = _
    +
    +  private[this] var leftIndex: Int = 0
    +  private[this] var rightIndex: Int = 0
    +  private[this] val leftMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow]
    +  private[this] val rightMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow]
    +  private[this] var leftMatched: BitSet = new BitSet(1)
    +  private[this] var rightMatched: BitSet = new BitSet(1)
    +
    +  advancedLeft()
    +  advancedRight()
    +
    +  // --- Private methods --------------------------------------------------------------------------
    +
    +  /**
    +   * Advance the left iterator and compute the new row's join key.
    +   * @return true if the left iterator returned a row and false otherwise.
    +   */
    +  private def advancedLeft(): Boolean = {
    +    if (leftIter.advanceNext()) {
    +      leftRow = leftIter.getRow
    +      leftRowKey = leftKeyGenerator(leftRow)
    +      numLeftRows += 1
    +      true
    +    } else {
    +      leftRow = null
    +      leftRowKey = null
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Advance the right iterator and compute the new row's join key.
    +   * @return true if the right iterator returned a row and false otherwise.
    +   */
    +  private def advancedRight(): Boolean = {
    +    if (rightIter.advanceNext()) {
    +      rightRow = rightIter.getRow
    +      rightRowKey = rightKeyGenerator(rightRow)
    +      numRightRows += 1
    +      true
    +    } else {
    +      rightRow = null
    +      rightRowKey = null
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Consume rows from both iterators until their keys are different from the provided matchingKey.
    +   */
    +  private def findMatchingRows(matchingKey: InternalRow): Unit = {
    +    leftMatches.clear()
    +    rightMatches.clear()
    +    leftIndex = 0
    +    rightIndex = 0
    +
    +    while (leftRowKey != null && keyOrdering.compare(leftRowKey, matchingKey) == 0) {
    +      leftMatches += leftRow.copy()
    +      advancedLeft()
    +    }
    +    while (rightRowKey != null && keyOrdering.compare(rightRowKey, matchingKey) == 0) {
    +      rightMatches += rightRow.copy()
    +      advancedRight()
    +    }
    +
    +    if (leftMatches.size <= leftMatched.capacity) {
    +      leftMatched.clear()
    +    } else {
    +      leftMatched = new BitSet(leftMatches.size)
    +    }
    +    if (rightMatches.size <= rightMatched.capacity) {
    +      rightMatched.clear()
    +    } else {
    +      rightMatched = new BitSet(rightMatches.size)
    +    }
    +  }
    +
    +  /**
    +   * Scan for next matching rows in buffers of both sides.
    +   * @return true if next row(s) are found and false otherwise.
    +   */
    +  private def scanNextInBuffered(): Boolean = {
    +    while (leftIndex < leftMatches.size) {
    +      while (rightIndex < rightMatches.size
    +        && !boundCondition(joinedRow(leftMatches(leftIndex), rightMatches(rightIndex)))) {
    +        rightIndex += 1
    +      }
    +      if (rightIndex < rightMatches.size) {
    +        // matched with condition
    +        leftMatched.set(leftIndex)
    +        rightMatched.set(rightIndex)
    +        rightIndex += 1
    +        return true
    +      }
    +      leftIndex += 1
    +      rightIndex = 0
    +      if (!leftMatched.get(leftIndex - 1)) {
    +        joinedRow(leftMatches(leftIndex - 1), rightNullRow)
    +        return true
    +      }
    +    }
    +
    +    while (rightIndex < rightMatches.size && rightMatched.get(rightIndex)) {
    +      rightIndex += 1
    +    }
    +    if (rightIndex < rightMatches.size) {
    +      joinedRow(leftNullRow, rightMatches(rightIndex))
    +      rightIndex += 1
    +      true
    +    } else {
    +      false
    +    }
    +  }
    +
    +  // --- Public methods --------------------------------------------------------------------------
    +
    +  def getJoinedRow(): JoinedRow = joinedRow
    +
    +  def advanceNext(): Boolean = {
    +    // We already buffered some matching rows, use them directly
    +    if (leftIndex <= leftMatches.size || rightIndex <= rightMatches.size) {
    +      if (scanNextInBuffered()) {
    +        return true
    +      }
    +    }
    +
    +    // Both left and right iterators have rows and they can be compared
    +    if (leftRow != null && (leftRowKey.anyNull || rightRow == null)) {
    +      // Only consume row(s) from left iterator
    +      joinedRow(leftRow.copy(), rightNullRow)
    +      advancedLeft()
    +      true
    +    } else if (rightRow != null && (rightRowKey.anyNull || leftRow == null)) {
    +      // Only consume row(s) from right iterator
    --- End diff --
    
    it's always a single row right? Also this comment doesn't really add much value so I think we can just remove it. Same as the one in L445


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-138699178
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42144/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-137254994
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8579#discussion_r38785328
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala ---
    @@ -71,6 +75,8 @@ case class SortMergeOuterJoin(
         // For left and right outer joins, the output is ordered by the streamed input's join keys.
         case LeftOuter => requiredOrders(leftKeys)
         case RightOuter => requiredOrders(rightKeys)
    +    // there are null rows between each stream, so there is no order
    --- End diff --
    
    `// There are null rows in both streams, so there is no order`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8579#discussion_r38980901
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala ---
    @@ -271,3 +297,189 @@ private class RightOuterIterator(
     
       override def getRow: InternalRow = resultProj(joinedRow)
     }
    +
    +private class SortMergeFullOuterJoinScanner(
    +    leftKeyGenerator: Projection,
    +    rightKeyGenerator: Projection,
    +    keyOrdering: Ordering[InternalRow],
    +    leftIter: RowIterator,
    +    numLeftRows: LongSQLMetric,
    +    rightIter: RowIterator,
    +    numRightRows: LongSQLMetric,
    +    boundCondition: InternalRow => Boolean,
    +    leftNullRow: InternalRow,
    +    rightNullRow: InternalRow)  {
    +  private[this] val joinedRow: JoinedRow = new JoinedRow()
    +  private[this] var leftRow: InternalRow = _
    +  private[this] var leftRowKey: InternalRow = _
    +  private[this] var rightRow: InternalRow = _
    +  private[this] var rightRowKey: InternalRow = _
    +
    +  private[this] var leftIndex: Int = 0
    +  private[this] var rightIndex: Int = 0
    +  private[this] val leftMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow]
    +  private[this] val rightMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow]
    +  private[this] var leftMatched: BitSet = new BitSet(1)
    +  private[this] var rightMatched: BitSet = new BitSet(1)
    +
    +  advancedLeft()
    +  advancedRight()
    +
    +  // --- Private methods --------------------------------------------------------------------------
    +
    +  /**
    +   * Advance the left iterator and compute the new row's join key.
    +   * @return true if the left iterator returned a row and false otherwise.
    +   */
    +  private def advancedLeft(): Boolean = {
    +    if (leftIter.advanceNext()) {
    +      leftRow = leftIter.getRow
    +      leftRowKey = leftKeyGenerator(leftRow)
    +      numLeftRows += 1
    +      true
    +    } else {
    +      leftRow = null
    +      leftRowKey = null
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Advance the right iterator and compute the new row's join key.
    +   * @return true if the right iterator returned a row and false otherwise.
    +   */
    +  private def advancedRight(): Boolean = {
    +    if (rightIter.advanceNext()) {
    +      rightRow = rightIter.getRow
    +      rightRowKey = rightKeyGenerator(rightRow)
    +      numRightRows += 1
    +      true
    +    } else {
    +      rightRow = null
    +      rightRowKey = null
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Consume rows from both iterators until their keys are different from the provided matchingKey.
    +   */
    +  private def findMatchingRows(matchingKey: InternalRow): Unit = {
    +    leftMatches.clear()
    +    rightMatches.clear()
    +    leftIndex = 0
    +    rightIndex = 0
    +
    +    while (leftRowKey != null && keyOrdering.compare(leftRowKey, matchingKey) == 0) {
    +      leftMatches += leftRow.copy()
    +      advancedLeft()
    +    }
    +    while (rightRowKey != null && keyOrdering.compare(rightRowKey, matchingKey) == 0) {
    +      rightMatches += rightRow.copy()
    +      advancedRight()
    +    }
    +
    +    if (leftMatches.size <= leftMatched.capacity) {
    +      leftMatched.clear()
    +    } else {
    +      leftMatched = new BitSet(leftMatches.size)
    +    }
    +    if (rightMatches.size <= rightMatched.capacity) {
    +      rightMatched.clear()
    +    } else {
    +      rightMatched = new BitSet(rightMatches.size)
    +    }
    +  }
    +
    +  /**
    +   * Scan for next matching rows in buffers of both sides.
    +   * @return true if next row(s) are found and false otherwise.
    +   */
    +  private def scanNextInBuffered(): Boolean = {
    +    while (leftIndex < leftMatches.size) {
    +      while (rightIndex < rightMatches.size
    +        && !boundCondition(joinedRow(leftMatches(leftIndex), rightMatches(rightIndex)))) {
    +        rightIndex += 1
    +      }
    +      if (rightIndex < rightMatches.size) {
    +        // matched with condition
    +        leftMatched.set(leftIndex)
    +        rightMatched.set(rightIndex)
    +        rightIndex += 1
    +        return true
    +      }
    +      leftIndex += 1
    +      rightIndex = 0
    +      if (!leftMatched.get(leftIndex - 1)) {
    +        joinedRow(leftMatches(leftIndex - 1), rightNullRow)
    +        return true
    +      }
    +    }
    +
    +    while (rightIndex < rightMatches.size && rightMatched.get(rightIndex)) {
    +      rightIndex += 1
    +    }
    +    if (rightIndex < rightMatches.size) {
    +      joinedRow(leftNullRow, rightMatches(rightIndex))
    +      rightIndex += 1
    +      true
    +    } else {
    +      false
    +    }
    --- End diff --
    
    The assert is not true, because it can start with leftIndex = leftMatches.size and 0< rightIndex <= rightMatches.size


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-137910504
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42035/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8579#discussion_r38708955
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
    @@ -132,15 +132,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
             joins.BroadcastHashOuterJoin(
               leftKeys, rightKeys, RightOuter, condition, planLater(left), planLater(right)) :: Nil
     
    -      case ExtractEquiJoinKeys(LeftOuter, leftKeys, rightKeys, condition, left, right)
    +      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
             if sqlContext.conf.sortMergeJoinEnabled && RowOrdering.isOrderable(leftKeys) =>
    --- End diff --
    
    Good question, isOrderable only checks the data type, rightKeys should have the same types as leftKeys.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-138734763
  
      [Test build #42166 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42166/consoleFull) for   PR 8579 at commit [`c861d72`](https://github.com/apache/spark/commit/c861d7204ffc380a950616d39331df3f202882c3).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-138732898
  
      [Test build #42154 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42154/console) for   PR 8579 at commit [`c861d72`](https://github.com/apache/spark/commit/c861d7204ffc380a950616d39331df3f202882c3).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8579#discussion_r38975215
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala ---
    @@ -271,3 +297,189 @@ private class RightOuterIterator(
     
       override def getRow: InternalRow = resultProj(joinedRow)
     }
    +
    +private class SortMergeFullOuterJoinScanner(
    +    leftKeyGenerator: Projection,
    +    rightKeyGenerator: Projection,
    +    keyOrdering: Ordering[InternalRow],
    +    leftIter: RowIterator,
    +    numLeftRows: LongSQLMetric,
    +    rightIter: RowIterator,
    +    numRightRows: LongSQLMetric,
    +    boundCondition: InternalRow => Boolean,
    +    leftNullRow: InternalRow,
    +    rightNullRow: InternalRow)  {
    +  private[this] val joinedRow: JoinedRow = new JoinedRow()
    +  private[this] var leftRow: InternalRow = _
    +  private[this] var leftRowKey: InternalRow = _
    +  private[this] var rightRow: InternalRow = _
    +  private[this] var rightRowKey: InternalRow = _
    +
    +  private[this] var leftIndex: Int = 0
    +  private[this] var rightIndex: Int = 0
    +  private[this] val leftMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow]
    +  private[this] val rightMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow]
    +  private[this] var leftMatched: BitSet = new BitSet(1)
    +  private[this] var rightMatched: BitSet = new BitSet(1)
    +
    +  advancedLeft()
    +  advancedRight()
    +
    +  // --- Private methods --------------------------------------------------------------------------
    +
    +  /**
    +   * Advance the left iterator and compute the new row's join key.
    +   * @return true if the left iterator returned a row and false otherwise.
    +   */
    +  private def advancedLeft(): Boolean = {
    +    if (leftIter.advanceNext()) {
    +      leftRow = leftIter.getRow
    +      leftRowKey = leftKeyGenerator(leftRow)
    +      numLeftRows += 1
    +      true
    +    } else {
    +      leftRow = null
    +      leftRowKey = null
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Advance the right iterator and compute the new row's join key.
    +   * @return true if the right iterator returned a row and false otherwise.
    +   */
    +  private def advancedRight(): Boolean = {
    +    if (rightIter.advanceNext()) {
    +      rightRow = rightIter.getRow
    +      rightRowKey = rightKeyGenerator(rightRow)
    +      numRightRows += 1
    +      true
    +    } else {
    +      rightRow = null
    +      rightRowKey = null
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Consume rows from both iterators until their keys are different from the provided matchingKey.
    +   */
    +  private def findMatchingRows(matchingKey: InternalRow): Unit = {
    +    leftMatches.clear()
    +    rightMatches.clear()
    +    leftIndex = 0
    +    rightIndex = 0
    +
    +    while (leftRowKey != null && keyOrdering.compare(leftRowKey, matchingKey) == 0) {
    +      leftMatches += leftRow.copy()
    +      advancedLeft()
    +    }
    +    while (rightRowKey != null && keyOrdering.compare(rightRowKey, matchingKey) == 0) {
    +      rightMatches += rightRow.copy()
    +      advancedRight()
    +    }
    +
    +    if (leftMatches.size <= leftMatched.capacity) {
    +      leftMatched.clear()
    +    } else {
    +      leftMatched = new BitSet(leftMatches.size)
    +    }
    +    if (rightMatches.size <= rightMatched.capacity) {
    +      rightMatched.clear()
    +    } else {
    +      rightMatched = new BitSet(rightMatches.size)
    +    }
    +  }
    +
    +  /**
    +   * Scan for next matching rows in buffers of both sides.
    +   * @return true if next row(s) are found and false otherwise.
    +   */
    +  private def scanNextInBuffered(): Boolean = {
    +    while (leftIndex < leftMatches.size) {
    +      while (rightIndex < rightMatches.size
    +        && !boundCondition(joinedRow(leftMatches(leftIndex), rightMatches(rightIndex)))) {
    --- End diff --
    
    can you put this line in a new method like the one you had before so it's easier to read:
    ```
    /**
     * Whether joining the current rows in the left and right buffers returns a valid match.
     * Note: this method mutates `joinedRow` to point to the current rows in the buffers.
     */
    private def isValidMatch: Boolean = {
      boundCondition(joinedRow(leftMatches(leftIndex), rightMatches(rightIndex)))
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8579#discussion_r38975999
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala ---
    @@ -271,3 +297,189 @@ private class RightOuterIterator(
     
       override def getRow: InternalRow = resultProj(joinedRow)
     }
    +
    +private class SortMergeFullOuterJoinScanner(
    +    leftKeyGenerator: Projection,
    +    rightKeyGenerator: Projection,
    +    keyOrdering: Ordering[InternalRow],
    +    leftIter: RowIterator,
    +    numLeftRows: LongSQLMetric,
    +    rightIter: RowIterator,
    +    numRightRows: LongSQLMetric,
    +    boundCondition: InternalRow => Boolean,
    +    leftNullRow: InternalRow,
    +    rightNullRow: InternalRow)  {
    +  private[this] val joinedRow: JoinedRow = new JoinedRow()
    +  private[this] var leftRow: InternalRow = _
    +  private[this] var leftRowKey: InternalRow = _
    +  private[this] var rightRow: InternalRow = _
    +  private[this] var rightRowKey: InternalRow = _
    +
    +  private[this] var leftIndex: Int = 0
    +  private[this] var rightIndex: Int = 0
    +  private[this] val leftMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow]
    +  private[this] val rightMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow]
    +  private[this] var leftMatched: BitSet = new BitSet(1)
    +  private[this] var rightMatched: BitSet = new BitSet(1)
    +
    +  advancedLeft()
    +  advancedRight()
    +
    +  // --- Private methods --------------------------------------------------------------------------
    +
    +  /**
    +   * Advance the left iterator and compute the new row's join key.
    +   * @return true if the left iterator returned a row and false otherwise.
    +   */
    +  private def advancedLeft(): Boolean = {
    +    if (leftIter.advanceNext()) {
    +      leftRow = leftIter.getRow
    +      leftRowKey = leftKeyGenerator(leftRow)
    +      numLeftRows += 1
    +      true
    +    } else {
    +      leftRow = null
    +      leftRowKey = null
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Advance the right iterator and compute the new row's join key.
    +   * @return true if the right iterator returned a row and false otherwise.
    +   */
    +  private def advancedRight(): Boolean = {
    +    if (rightIter.advanceNext()) {
    +      rightRow = rightIter.getRow
    +      rightRowKey = rightKeyGenerator(rightRow)
    +      numRightRows += 1
    +      true
    +    } else {
    +      rightRow = null
    +      rightRowKey = null
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Consume rows from both iterators until their keys are different from the provided matchingKey.
    --- End diff --
    
    This actually describes a side-effect of the method, not its main purpose. I think we should add the following:
    ```
    /**
     * Populate the left and right buffers with rows matching the provided key.
     * This consumes rows from both iterators until their keys are different from the matching key.
     */
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-137256666
  
      [Test build #41946 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/41946/consoleFull) for   PR 8579 at commit [`8a81df4`](https://github.com/apache/spark/commit/8a81df4eac38ec17be81c15a0a0cd51e340c7a04).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-138711081
  
    LGTM. Maybe @JoshRosen would like to take a pass before this gets merged?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-137255024
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-137909760
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-137277936
  
      [Test build #41946 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/41946/console) for   PR 8579 at commit [`8a81df4`](https://github.com/apache/spark/commit/8a81df4eac38ec17be81c15a0a0cd51e340c7a04).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-137910503
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-138756976
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8579#discussion_r38784898
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala ---
    @@ -271,3 +297,203 @@ private class RightOuterIterator(
     
       override def getRow: InternalRow = resultProj(joinedRow)
     }
    +
    +private class SortMergeFullJoinScanner(
    --- End diff --
    
    can you call this `SortMergeFullOuterJoinScanner`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-137910372
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8579#discussion_r38708327
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
    @@ -132,15 +132,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
             joins.BroadcastHashOuterJoin(
               leftKeys, rightKeys, RightOuter, condition, planLater(left), planLater(right)) :: Nil
     
    -      case ExtractEquiJoinKeys(LeftOuter, leftKeys, rightKeys, condition, left, right)
    +      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
             if sqlContext.conf.sortMergeJoinEnabled && RowOrdering.isOrderable(leftKeys) =>
    --- End diff --
    
    for `RightOuter` and `FullOuter` do we need to make sure the right keys are order-able?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-138756982
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42166/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-138706596
  
      [Test build #42154 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42154/consoleFull) for   PR 8579 at commit [`c861d72`](https://github.com/apache/spark/commit/c861d7204ffc380a950616d39331df3f202882c3).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-137287379
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-138699033
  
      [Test build #42144 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42144/console) for   PR 8579 at commit [`aff34ee`](https://github.com/apache/spark/commit/aff34eea38159a2bc5bb9d2eb9308bc3c0db5c9f).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-138659722
  
      [Test build #42144 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42144/consoleFull) for   PR 8579 at commit [`aff34ee`](https://github.com/apache/spark/commit/aff34eea38159a2bc5bb9d2eb9308bc3c0db5c9f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8579#discussion_r38975742
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala ---
    @@ -271,3 +297,189 @@ private class RightOuterIterator(
     
       override def getRow: InternalRow = resultProj(joinedRow)
     }
    +
    +private class SortMergeFullOuterJoinScanner(
    +    leftKeyGenerator: Projection,
    +    rightKeyGenerator: Projection,
    +    keyOrdering: Ordering[InternalRow],
    +    leftIter: RowIterator,
    +    numLeftRows: LongSQLMetric,
    +    rightIter: RowIterator,
    +    numRightRows: LongSQLMetric,
    +    boundCondition: InternalRow => Boolean,
    +    leftNullRow: InternalRow,
    +    rightNullRow: InternalRow)  {
    +  private[this] val joinedRow: JoinedRow = new JoinedRow()
    +  private[this] var leftRow: InternalRow = _
    +  private[this] var leftRowKey: InternalRow = _
    +  private[this] var rightRow: InternalRow = _
    +  private[this] var rightRowKey: InternalRow = _
    +
    +  private[this] var leftIndex: Int = 0
    +  private[this] var rightIndex: Int = 0
    +  private[this] val leftMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow]
    +  private[this] val rightMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow]
    +  private[this] var leftMatched: BitSet = new BitSet(1)
    +  private[this] var rightMatched: BitSet = new BitSet(1)
    +
    +  advancedLeft()
    +  advancedRight()
    +
    +  // --- Private methods --------------------------------------------------------------------------
    +
    +  /**
    +   * Advance the left iterator and compute the new row's join key.
    +   * @return true if the left iterator returned a row and false otherwise.
    +   */
    +  private def advancedLeft(): Boolean = {
    +    if (leftIter.advanceNext()) {
    +      leftRow = leftIter.getRow
    +      leftRowKey = leftKeyGenerator(leftRow)
    +      numLeftRows += 1
    +      true
    +    } else {
    +      leftRow = null
    +      leftRowKey = null
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Advance the right iterator and compute the new row's join key.
    +   * @return true if the right iterator returned a row and false otherwise.
    +   */
    +  private def advancedRight(): Boolean = {
    +    if (rightIter.advanceNext()) {
    +      rightRow = rightIter.getRow
    +      rightRowKey = rightKeyGenerator(rightRow)
    +      numRightRows += 1
    +      true
    +    } else {
    +      rightRow = null
    +      rightRowKey = null
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Consume rows from both iterators until their keys are different from the provided matchingKey.
    +   */
    +  private def findMatchingRows(matchingKey: InternalRow): Unit = {
    +    leftMatches.clear()
    +    rightMatches.clear()
    +    leftIndex = 0
    +    rightIndex = 0
    +
    +    while (leftRowKey != null && keyOrdering.compare(leftRowKey, matchingKey) == 0) {
    +      leftMatches += leftRow.copy()
    +      advancedLeft()
    +    }
    +    while (rightRowKey != null && keyOrdering.compare(rightRowKey, matchingKey) == 0) {
    +      rightMatches += rightRow.copy()
    +      advancedRight()
    +    }
    +
    +    if (leftMatches.size <= leftMatched.capacity) {
    +      leftMatched.clear()
    +    } else {
    +      leftMatched = new BitSet(leftMatches.size)
    +    }
    +    if (rightMatches.size <= rightMatched.capacity) {
    +      rightMatched.clear()
    +    } else {
    +      rightMatched = new BitSet(rightMatches.size)
    +    }
    +  }
    +
    +  /**
    +   * Scan for next matching rows in buffers of both sides.
    +   * @return true if next row(s) are found and false otherwise.
    +   */
    +  private def scanNextInBuffered(): Boolean = {
    +    while (leftIndex < leftMatches.size) {
    +      while (rightIndex < rightMatches.size
    +        && !boundCondition(joinedRow(leftMatches(leftIndex), rightMatches(rightIndex)))) {
    +        rightIndex += 1
    +      }
    +      if (rightIndex < rightMatches.size) {
    +        // matched with condition
    +        leftMatched.set(leftIndex)
    +        rightMatched.set(rightIndex)
    +        rightIndex += 1
    +        return true
    +      }
    +      leftIndex += 1
    +      rightIndex = 0
    +      if (!leftMatched.get(leftIndex - 1)) {
    +        joinedRow(leftMatches(leftIndex - 1), rightNullRow)
    +        return true
    +      }
    +    }
    +
    +    while (rightIndex < rightMatches.size && rightMatched.get(rightIndex)) {
    +      rightIndex += 1
    +    }
    +    if (rightIndex < rightMatches.size) {
    +      joinedRow(leftNullRow, rightMatches(rightIndex))
    +      rightIndex += 1
    +      true
    +    } else {
    +      false
    +    }
    +  }
    +
    +  // --- Public methods --------------------------------------------------------------------------
    +
    +  def getJoinedRow(): JoinedRow = joinedRow
    +
    +  def advanceNext(): Boolean = {
    +    // We already buffered some matching rows, use them directly
    --- End diff --
    
    `If we already buffered [...]` (we're outside the `if` case)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-137910275
  
      [Test build #42035 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42035/consoleFull) for   PR 8579 at commit [`ce57f67`](https://github.com/apache/spark/commit/ce57f67d7291896056d1a23a0a7e2fe8bb5ad1e3).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-138734662
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-137910374
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42034/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-137277982
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/41946/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-138656903
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-138732951
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-139049449
  
    @andrewor14 Is this ready to go?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8579#discussion_r38975270
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala ---
    @@ -271,3 +297,189 @@ private class RightOuterIterator(
     
       override def getRow: InternalRow = resultProj(joinedRow)
     }
    +
    +private class SortMergeFullOuterJoinScanner(
    +    leftKeyGenerator: Projection,
    +    rightKeyGenerator: Projection,
    +    keyOrdering: Ordering[InternalRow],
    +    leftIter: RowIterator,
    +    numLeftRows: LongSQLMetric,
    +    rightIter: RowIterator,
    +    numRightRows: LongSQLMetric,
    +    boundCondition: InternalRow => Boolean,
    +    leftNullRow: InternalRow,
    +    rightNullRow: InternalRow)  {
    +  private[this] val joinedRow: JoinedRow = new JoinedRow()
    +  private[this] var leftRow: InternalRow = _
    +  private[this] var leftRowKey: InternalRow = _
    +  private[this] var rightRow: InternalRow = _
    +  private[this] var rightRowKey: InternalRow = _
    +
    +  private[this] var leftIndex: Int = 0
    +  private[this] var rightIndex: Int = 0
    +  private[this] val leftMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow]
    +  private[this] val rightMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow]
    +  private[this] var leftMatched: BitSet = new BitSet(1)
    +  private[this] var rightMatched: BitSet = new BitSet(1)
    +
    +  advancedLeft()
    +  advancedRight()
    +
    +  // --- Private methods --------------------------------------------------------------------------
    +
    +  /**
    +   * Advance the left iterator and compute the new row's join key.
    +   * @return true if the left iterator returned a row and false otherwise.
    +   */
    +  private def advancedLeft(): Boolean = {
    +    if (leftIter.advanceNext()) {
    +      leftRow = leftIter.getRow
    +      leftRowKey = leftKeyGenerator(leftRow)
    +      numLeftRows += 1
    +      true
    +    } else {
    +      leftRow = null
    +      leftRowKey = null
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Advance the right iterator and compute the new row's join key.
    +   * @return true if the right iterator returned a row and false otherwise.
    +   */
    +  private def advancedRight(): Boolean = {
    +    if (rightIter.advanceNext()) {
    +      rightRow = rightIter.getRow
    +      rightRowKey = rightKeyGenerator(rightRow)
    +      numRightRows += 1
    +      true
    +    } else {
    +      rightRow = null
    +      rightRowKey = null
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Consume rows from both iterators until their keys are different from the provided matchingKey.
    +   */
    +  private def findMatchingRows(matchingKey: InternalRow): Unit = {
    +    leftMatches.clear()
    +    rightMatches.clear()
    +    leftIndex = 0
    +    rightIndex = 0
    +
    +    while (leftRowKey != null && keyOrdering.compare(leftRowKey, matchingKey) == 0) {
    +      leftMatches += leftRow.copy()
    +      advancedLeft()
    +    }
    +    while (rightRowKey != null && keyOrdering.compare(rightRowKey, matchingKey) == 0) {
    +      rightMatches += rightRow.copy()
    +      advancedRight()
    +    }
    +
    +    if (leftMatches.size <= leftMatched.capacity) {
    +      leftMatched.clear()
    +    } else {
    +      leftMatched = new BitSet(leftMatches.size)
    +    }
    +    if (rightMatches.size <= rightMatched.capacity) {
    +      rightMatched.clear()
    +    } else {
    +      rightMatched = new BitSet(rightMatches.size)
    +    }
    +  }
    +
    +  /**
    +   * Scan for next matching rows in buffers of both sides.
    +   * @return true if next row(s) are found and false otherwise.
    +   */
    --- End diff --
    
    maybe expand on this doc a little:
    ```
    /**
     * Scan the left and right buffers for the next valid match.
     *
     * Note: this method mutates `joinedRow` to point to the latest matching rows in the buffers.
     * If a left row has no valid matches on the right, or a right row has no valid matches on the
     * left, then the row is joined with the null row and the result is considered a valid match.
     *
     * @return true if a valid match is found, false otherwise.
     */
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-138733977
  
    retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-137910501
  
      [Test build #42035 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42035/console) for   PR 8579 at commit [`ce57f67`](https://github.com/apache/spark/commit/ce57f67d7291896056d1a23a0a7e2fe8bb5ad1e3).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-137909582
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8579#discussion_r38976588
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala ---
    @@ -97,17 +97,17 @@ class OuterJoinSuite extends SparkPlanTest with SharedSQLContext {
                 }
             }
           }
    +    }
     
    -      test(s"$testName using SortMergeOuterJoin") {
    -        extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _) =>
    -            withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
    -              checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) =>
    -                EnsureRequirements(sqlContext).apply(
    -                  SortMergeOuterJoin(leftKeys, rightKeys, joinType, boundCondition, left, right)),
    -                expectedAnswer.map(Row.fromTuple),
    -                sortAnswers = false)
    -            }
    -        }
    +    test(s"$testName using SortMergeOuterJoin") {
    +      extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _) =>
    +          withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
    +            checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) =>
    +              EnsureRequirements(sqlContext).apply(
    +                SortMergeOuterJoin(leftKeys, rightKeys, joinType, boundCondition, left, right)),
    +              expectedAnswer.map(Row.fromTuple),
    +              sortAnswers = true)
    +          }
    --- End diff --
    
    not your code, but we should unindent the `withSQLConf` here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-137259497
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-137287380
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/41948/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8579#discussion_r38784804
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/BitSet.scala ---
    @@ -33,6 +33,17 @@ class BitSet(numBits: Int) extends Serializable {
       def capacity: Int = numWords * 64
     
       /**
    +   * Clear all setted bits.
    --- End diff --
    
    Clear all set bits


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-138699177
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8579#discussion_r38975477
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala ---
    @@ -271,3 +297,189 @@ private class RightOuterIterator(
     
       override def getRow: InternalRow = resultProj(joinedRow)
     }
    +
    +private class SortMergeFullOuterJoinScanner(
    +    leftKeyGenerator: Projection,
    +    rightKeyGenerator: Projection,
    +    keyOrdering: Ordering[InternalRow],
    +    leftIter: RowIterator,
    +    numLeftRows: LongSQLMetric,
    +    rightIter: RowIterator,
    +    numRightRows: LongSQLMetric,
    +    boundCondition: InternalRow => Boolean,
    +    leftNullRow: InternalRow,
    +    rightNullRow: InternalRow)  {
    +  private[this] val joinedRow: JoinedRow = new JoinedRow()
    +  private[this] var leftRow: InternalRow = _
    +  private[this] var leftRowKey: InternalRow = _
    +  private[this] var rightRow: InternalRow = _
    +  private[this] var rightRowKey: InternalRow = _
    +
    +  private[this] var leftIndex: Int = 0
    +  private[this] var rightIndex: Int = 0
    +  private[this] val leftMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow]
    +  private[this] val rightMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow]
    +  private[this] var leftMatched: BitSet = new BitSet(1)
    +  private[this] var rightMatched: BitSet = new BitSet(1)
    +
    +  advancedLeft()
    +  advancedRight()
    +
    +  // --- Private methods --------------------------------------------------------------------------
    +
    +  /**
    +   * Advance the left iterator and compute the new row's join key.
    +   * @return true if the left iterator returned a row and false otherwise.
    +   */
    +  private def advancedLeft(): Boolean = {
    +    if (leftIter.advanceNext()) {
    +      leftRow = leftIter.getRow
    +      leftRowKey = leftKeyGenerator(leftRow)
    +      numLeftRows += 1
    +      true
    +    } else {
    +      leftRow = null
    +      leftRowKey = null
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Advance the right iterator and compute the new row's join key.
    +   * @return true if the right iterator returned a row and false otherwise.
    +   */
    +  private def advancedRight(): Boolean = {
    +    if (rightIter.advanceNext()) {
    +      rightRow = rightIter.getRow
    +      rightRowKey = rightKeyGenerator(rightRow)
    +      numRightRows += 1
    +      true
    +    } else {
    +      rightRow = null
    +      rightRowKey = null
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Consume rows from both iterators until their keys are different from the provided matchingKey.
    +   */
    +  private def findMatchingRows(matchingKey: InternalRow): Unit = {
    +    leftMatches.clear()
    +    rightMatches.clear()
    +    leftIndex = 0
    +    rightIndex = 0
    +
    +    while (leftRowKey != null && keyOrdering.compare(leftRowKey, matchingKey) == 0) {
    +      leftMatches += leftRow.copy()
    +      advancedLeft()
    +    }
    +    while (rightRowKey != null && keyOrdering.compare(rightRowKey, matchingKey) == 0) {
    +      rightMatches += rightRow.copy()
    +      advancedRight()
    +    }
    +
    +    if (leftMatches.size <= leftMatched.capacity) {
    +      leftMatched.clear()
    +    } else {
    +      leftMatched = new BitSet(leftMatches.size)
    +    }
    +    if (rightMatches.size <= rightMatched.capacity) {
    +      rightMatched.clear()
    +    } else {
    +      rightMatched = new BitSet(rightMatches.size)
    +    }
    +  }
    +
    +  /**
    +   * Scan for next matching rows in buffers of both sides.
    +   * @return true if next row(s) are found and false otherwise.
    +   */
    +  private def scanNextInBuffered(): Boolean = {
    +    while (leftIndex < leftMatches.size) {
    +      while (rightIndex < rightMatches.size
    +        && !boundCondition(joinedRow(leftMatches(leftIndex), rightMatches(rightIndex)))) {
    +        rightIndex += 1
    +      }
    +      if (rightIndex < rightMatches.size) {
    +        // matched with condition
    +        leftMatched.set(leftIndex)
    +        rightMatched.set(rightIndex)
    +        rightIndex += 1
    +        return true
    +      }
    +      leftIndex += 1
    +      rightIndex = 0
    +      if (!leftMatched.get(leftIndex - 1)) {
    +        joinedRow(leftMatches(leftIndex - 1), rightNullRow)
    +        return true
    +      }
    +    }
    +
    +    while (rightIndex < rightMatches.size && rightMatched.get(rightIndex)) {
    +      rightIndex += 1
    +    }
    +    if (rightIndex < rightMatches.size) {
    +      joinedRow(leftNullRow, rightMatches(rightIndex))
    +      rightIndex += 1
    +      true
    +    } else {
    +      false
    +    }
    --- End diff --
    
    I think this block is easier to follow if you rewrite it as follows:
    ```
    // Find right rows that have no matches on the left, and join them with the null row
    assert(rightIndex == 0, "right index should have been reset to 0")
    while (rightIndex < rightMatches.size) {
      if (!rightMatched.get(rightIndex)) {
        joinedRow(leftNullRow, rightMatches(rightIndex))
        rightIndex += 1
        return true
      }
      rightIndex += 1
    }
    
    // There are no more valid matches in the left and right buffers
    false
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-138734619
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-138703283
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-137909616
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8579#discussion_r38975863
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala ---
    @@ -271,3 +297,189 @@ private class RightOuterIterator(
     
       override def getRow: InternalRow = resultProj(joinedRow)
     }
    +
    +private class SortMergeFullOuterJoinScanner(
    +    leftKeyGenerator: Projection,
    +    rightKeyGenerator: Projection,
    +    keyOrdering: Ordering[InternalRow],
    +    leftIter: RowIterator,
    +    numLeftRows: LongSQLMetric,
    +    rightIter: RowIterator,
    +    numRightRows: LongSQLMetric,
    +    boundCondition: InternalRow => Boolean,
    +    leftNullRow: InternalRow,
    +    rightNullRow: InternalRow)  {
    +  private[this] val joinedRow: JoinedRow = new JoinedRow()
    +  private[this] var leftRow: InternalRow = _
    +  private[this] var leftRowKey: InternalRow = _
    +  private[this] var rightRow: InternalRow = _
    +  private[this] var rightRowKey: InternalRow = _
    +
    +  private[this] var leftIndex: Int = 0
    +  private[this] var rightIndex: Int = 0
    +  private[this] val leftMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow]
    +  private[this] val rightMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow]
    +  private[this] var leftMatched: BitSet = new BitSet(1)
    +  private[this] var rightMatched: BitSet = new BitSet(1)
    +
    +  advancedLeft()
    +  advancedRight()
    +
    +  // --- Private methods --------------------------------------------------------------------------
    +
    +  /**
    +   * Advance the left iterator and compute the new row's join key.
    +   * @return true if the left iterator returned a row and false otherwise.
    +   */
    +  private def advancedLeft(): Boolean = {
    +    if (leftIter.advanceNext()) {
    +      leftRow = leftIter.getRow
    +      leftRowKey = leftKeyGenerator(leftRow)
    +      numLeftRows += 1
    +      true
    +    } else {
    +      leftRow = null
    +      leftRowKey = null
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Advance the right iterator and compute the new row's join key.
    +   * @return true if the right iterator returned a row and false otherwise.
    +   */
    +  private def advancedRight(): Boolean = {
    +    if (rightIter.advanceNext()) {
    +      rightRow = rightIter.getRow
    +      rightRowKey = rightKeyGenerator(rightRow)
    +      numRightRows += 1
    +      true
    +    } else {
    +      rightRow = null
    +      rightRowKey = null
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Consume rows from both iterators until their keys are different from the provided matchingKey.
    +   */
    +  private def findMatchingRows(matchingKey: InternalRow): Unit = {
    +    leftMatches.clear()
    +    rightMatches.clear()
    +    leftIndex = 0
    +    rightIndex = 0
    +
    +    while (leftRowKey != null && keyOrdering.compare(leftRowKey, matchingKey) == 0) {
    +      leftMatches += leftRow.copy()
    +      advancedLeft()
    +    }
    +    while (rightRowKey != null && keyOrdering.compare(rightRowKey, matchingKey) == 0) {
    +      rightMatches += rightRow.copy()
    +      advancedRight()
    +    }
    +
    +    if (leftMatches.size <= leftMatched.capacity) {
    +      leftMatched.clear()
    +    } else {
    +      leftMatched = new BitSet(leftMatches.size)
    +    }
    +    if (rightMatches.size <= rightMatched.capacity) {
    +      rightMatched.clear()
    +    } else {
    +      rightMatched = new BitSet(rightMatches.size)
    +    }
    +  }
    +
    +  /**
    +   * Scan for next matching rows in buffers of both sides.
    +   * @return true if next row(s) are found and false otherwise.
    +   */
    +  private def scanNextInBuffered(): Boolean = {
    +    while (leftIndex < leftMatches.size) {
    +      while (rightIndex < rightMatches.size
    +        && !boundCondition(joinedRow(leftMatches(leftIndex), rightMatches(rightIndex)))) {
    +        rightIndex += 1
    +      }
    +      if (rightIndex < rightMatches.size) {
    +        // matched with condition
    +        leftMatched.set(leftIndex)
    +        rightMatched.set(rightIndex)
    +        rightIndex += 1
    +        return true
    +      }
    +      leftIndex += 1
    +      rightIndex = 0
    +      if (!leftMatched.get(leftIndex - 1)) {
    +        joinedRow(leftMatches(leftIndex - 1), rightNullRow)
    +        return true
    +      }
    +    }
    +
    +    while (rightIndex < rightMatches.size && rightMatched.get(rightIndex)) {
    +      rightIndex += 1
    +    }
    +    if (rightIndex < rightMatches.size) {
    +      joinedRow(leftNullRow, rightMatches(rightIndex))
    +      rightIndex += 1
    +      true
    +    } else {
    +      false
    +    }
    +  }
    +
    +  // --- Public methods --------------------------------------------------------------------------
    +
    +  def getJoinedRow(): JoinedRow = joinedRow
    +
    +  def advanceNext(): Boolean = {
    +    // We already buffered some matching rows, use them directly
    +    if (leftIndex <= leftMatches.size || rightIndex <= rightMatches.size) {
    +      if (scanNextInBuffered()) {
    +        return true
    +      }
    +    }
    +
    +    // Both left and right iterators have rows and they can be compared
    +    if (leftRow != null && (leftRowKey.anyNull || rightRow == null)) {
    +      // Only consume row(s) from left iterator
    +      joinedRow(leftRow.copy(), rightNullRow)
    +      advancedLeft()
    +      true
    +    } else if (rightRow != null && (rightRowKey.anyNull || leftRow == null)) {
    +      // Only consume row(s) from right iterator
    +      joinedRow(leftNullRow, rightRow.copy())
    +      advancedRight()
    +      true
    +    } else if (leftRow != null && rightRow != null) {
    +      // find matching rows from both sides
    --- End diff --
    
    ```
    // Both rows are present and neither have null values,
    // so we populate the buffers with rows matching the next key
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8579#discussion_r38788635
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala ---
    @@ -271,3 +297,203 @@ private class RightOuterIterator(
     
       override def getRow: InternalRow = resultProj(joinedRow)
     }
    +
    +private class SortMergeFullJoinScanner(
    +    leftKeyGenerator: Projection,
    +    rightKeyGenerator: Projection,
    +    keyOrdering: Ordering[InternalRow],
    +    leftIter: RowIterator,
    +    numLeftRows: LongSQLMetric,
    +    rightIter: RowIterator,
    +    numRightRows: LongSQLMetric,
    +    boundCondition: InternalRow => Boolean,
    +    leftNullRow: InternalRow,
    +    rightNullRow: InternalRow)  {
    +  private[this] val joinedRow: JoinedRow = new JoinedRow()
    +  private[this] var leftRow: InternalRow = _
    +  private[this] var leftRowKey: InternalRow = _
    +  private[this] var rightRow: InternalRow = _
    +  private[this] var rightRowKey: InternalRow = _
    +
    +  private[this] var leftIndex: Int = 0
    +  private[this] var rightIndex: Int = 0
    +  private[this] val leftMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow]
    +  private[this] val rightMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow]
    +  private[this] var leftMatched: BitSet = new BitSet(1)
    +  private[this] var rightMatched: BitSet = new BitSet(1)
    +
    +  advancedLeft()
    +  advancedRight()
    +
    +  // --- Private methods --------------------------------------------------------------------------
    +
    +  /**
    +   * Advance the left iterator and compute the new row's join key.
    +   * @return true if the left iterator returned a row and false otherwise.
    +   */
    +  private def advancedLeft(): Boolean = {
    +    if (leftIter.advanceNext()) {
    +      leftRow = leftIter.getRow
    +      leftRowKey = leftKeyGenerator(leftRow)
    +      numLeftRows += 1
    +      true
    +    } else {
    +      leftRow = null
    +      leftRowKey = null
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Advance the right iterator and compute the new row's join key.
    +   * @return true if the right iterator returned a row and false otherwise.
    +   */
    +  private def advancedRight(): Boolean = {
    +    if (rightIter.advanceNext()) {
    +      rightRow = rightIter.getRow
    +      rightRowKey = rightKeyGenerator(rightRow)
    +      numRightRows += 1
    +      true
    +    } else {
    +      rightRow = null
    +      rightRowKey = null
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Consume rows from both iterators until their key are different than matchingKey.
    --- End diff --
    
    until their keys are different from the provided `matchingKey`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-137909758
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8579#discussion_r38975669
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala ---
    @@ -271,3 +297,189 @@ private class RightOuterIterator(
     
       override def getRow: InternalRow = resultProj(joinedRow)
     }
    +
    +private class SortMergeFullOuterJoinScanner(
    +    leftKeyGenerator: Projection,
    +    rightKeyGenerator: Projection,
    +    keyOrdering: Ordering[InternalRow],
    +    leftIter: RowIterator,
    +    numLeftRows: LongSQLMetric,
    +    rightIter: RowIterator,
    +    numRightRows: LongSQLMetric,
    +    boundCondition: InternalRow => Boolean,
    +    leftNullRow: InternalRow,
    +    rightNullRow: InternalRow)  {
    +  private[this] val joinedRow: JoinedRow = new JoinedRow()
    +  private[this] var leftRow: InternalRow = _
    +  private[this] var leftRowKey: InternalRow = _
    +  private[this] var rightRow: InternalRow = _
    +  private[this] var rightRowKey: InternalRow = _
    +
    +  private[this] var leftIndex: Int = 0
    +  private[this] var rightIndex: Int = 0
    +  private[this] val leftMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow]
    +  private[this] val rightMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow]
    +  private[this] var leftMatched: BitSet = new BitSet(1)
    +  private[this] var rightMatched: BitSet = new BitSet(1)
    +
    +  advancedLeft()
    +  advancedRight()
    +
    +  // --- Private methods --------------------------------------------------------------------------
    +
    +  /**
    +   * Advance the left iterator and compute the new row's join key.
    +   * @return true if the left iterator returned a row and false otherwise.
    +   */
    +  private def advancedLeft(): Boolean = {
    +    if (leftIter.advanceNext()) {
    +      leftRow = leftIter.getRow
    +      leftRowKey = leftKeyGenerator(leftRow)
    +      numLeftRows += 1
    +      true
    +    } else {
    +      leftRow = null
    +      leftRowKey = null
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Advance the right iterator and compute the new row's join key.
    +   * @return true if the right iterator returned a row and false otherwise.
    +   */
    +  private def advancedRight(): Boolean = {
    +    if (rightIter.advanceNext()) {
    +      rightRow = rightIter.getRow
    +      rightRowKey = rightKeyGenerator(rightRow)
    +      numRightRows += 1
    +      true
    +    } else {
    +      rightRow = null
    +      rightRowKey = null
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Consume rows from both iterators until their keys are different from the provided matchingKey.
    +   */
    +  private def findMatchingRows(matchingKey: InternalRow): Unit = {
    +    leftMatches.clear()
    +    rightMatches.clear()
    +    leftIndex = 0
    +    rightIndex = 0
    +
    +    while (leftRowKey != null && keyOrdering.compare(leftRowKey, matchingKey) == 0) {
    +      leftMatches += leftRow.copy()
    +      advancedLeft()
    +    }
    +    while (rightRowKey != null && keyOrdering.compare(rightRowKey, matchingKey) == 0) {
    +      rightMatches += rightRow.copy()
    +      advancedRight()
    +    }
    +
    +    if (leftMatches.size <= leftMatched.capacity) {
    +      leftMatched.clear()
    +    } else {
    +      leftMatched = new BitSet(leftMatches.size)
    +    }
    +    if (rightMatches.size <= rightMatched.capacity) {
    +      rightMatched.clear()
    +    } else {
    +      rightMatched = new BitSet(rightMatches.size)
    +    }
    +  }
    +
    +  /**
    +   * Scan for next matching rows in buffers of both sides.
    +   * @return true if next row(s) are found and false otherwise.
    +   */
    +  private def scanNextInBuffered(): Boolean = {
    +    while (leftIndex < leftMatches.size) {
    +      while (rightIndex < rightMatches.size
    +        && !boundCondition(joinedRow(leftMatches(leftIndex), rightMatches(rightIndex)))) {
    +        rightIndex += 1
    +      }
    +      if (rightIndex < rightMatches.size) {
    +        // matched with condition
    +        leftMatched.set(leftIndex)
    +        rightMatched.set(rightIndex)
    +        rightIndex += 1
    +        return true
    +      }
    +      leftIndex += 1
    +      rightIndex = 0
    +      if (!leftMatched.get(leftIndex - 1)) {
    +        joinedRow(leftMatches(leftIndex - 1), rightNullRow)
    +        return true
    +      }
    +    }
    +
    +    while (rightIndex < rightMatches.size && rightMatched.get(rightIndex)) {
    +      rightIndex += 1
    +    }
    +    if (rightIndex < rightMatches.size) {
    +      joinedRow(leftNullRow, rightMatches(rightIndex))
    +      rightIndex += 1
    +      true
    +    } else {
    +      false
    +    }
    +  }
    +
    +  // --- Public methods --------------------------------------------------------------------------
    +
    +  def getJoinedRow(): JoinedRow = joinedRow
    +
    +  def advanceNext(): Boolean = {
    +    // We already buffered some matching rows, use them directly
    +    if (leftIndex <= leftMatches.size || rightIndex <= rightMatches.size) {
    +      if (scanNextInBuffered()) {
    +        return true
    +      }
    +    }
    +
    +    // Both left and right iterators have rows and they can be compared
    --- End diff --
    
    This comment is not necessarily true (e.g. `rightRow` can be null)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-137287302
  
      [Test build #41948 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/41948/console) for   PR 8579 at commit [`16756a6`](https://github.com/apache/spark/commit/16756a6120effea72bda039b5d0e09a24087504e).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8579#discussion_r38975588
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala ---
    @@ -271,3 +297,189 @@ private class RightOuterIterator(
     
       override def getRow: InternalRow = resultProj(joinedRow)
     }
    +
    +private class SortMergeFullOuterJoinScanner(
    +    leftKeyGenerator: Projection,
    +    rightKeyGenerator: Projection,
    +    keyOrdering: Ordering[InternalRow],
    +    leftIter: RowIterator,
    +    numLeftRows: LongSQLMetric,
    +    rightIter: RowIterator,
    +    numRightRows: LongSQLMetric,
    +    boundCondition: InternalRow => Boolean,
    +    leftNullRow: InternalRow,
    +    rightNullRow: InternalRow)  {
    +  private[this] val joinedRow: JoinedRow = new JoinedRow()
    +  private[this] var leftRow: InternalRow = _
    +  private[this] var leftRowKey: InternalRow = _
    +  private[this] var rightRow: InternalRow = _
    +  private[this] var rightRowKey: InternalRow = _
    +
    +  private[this] var leftIndex: Int = 0
    +  private[this] var rightIndex: Int = 0
    +  private[this] val leftMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow]
    +  private[this] val rightMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow]
    +  private[this] var leftMatched: BitSet = new BitSet(1)
    +  private[this] var rightMatched: BitSet = new BitSet(1)
    +
    +  advancedLeft()
    +  advancedRight()
    +
    +  // --- Private methods --------------------------------------------------------------------------
    +
    +  /**
    +   * Advance the left iterator and compute the new row's join key.
    +   * @return true if the left iterator returned a row and false otherwise.
    +   */
    +  private def advancedLeft(): Boolean = {
    +    if (leftIter.advanceNext()) {
    +      leftRow = leftIter.getRow
    +      leftRowKey = leftKeyGenerator(leftRow)
    +      numLeftRows += 1
    +      true
    +    } else {
    +      leftRow = null
    +      leftRowKey = null
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Advance the right iterator and compute the new row's join key.
    +   * @return true if the right iterator returned a row and false otherwise.
    +   */
    +  private def advancedRight(): Boolean = {
    +    if (rightIter.advanceNext()) {
    +      rightRow = rightIter.getRow
    +      rightRowKey = rightKeyGenerator(rightRow)
    +      numRightRows += 1
    +      true
    +    } else {
    +      rightRow = null
    +      rightRowKey = null
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Consume rows from both iterators until their keys are different from the provided matchingKey.
    +   */
    +  private def findMatchingRows(matchingKey: InternalRow): Unit = {
    +    leftMatches.clear()
    +    rightMatches.clear()
    +    leftIndex = 0
    +    rightIndex = 0
    +
    +    while (leftRowKey != null && keyOrdering.compare(leftRowKey, matchingKey) == 0) {
    +      leftMatches += leftRow.copy()
    +      advancedLeft()
    +    }
    +    while (rightRowKey != null && keyOrdering.compare(rightRowKey, matchingKey) == 0) {
    +      rightMatches += rightRow.copy()
    +      advancedRight()
    +    }
    +
    +    if (leftMatches.size <= leftMatched.capacity) {
    +      leftMatched.clear()
    +    } else {
    +      leftMatched = new BitSet(leftMatches.size)
    +    }
    +    if (rightMatches.size <= rightMatched.capacity) {
    +      rightMatched.clear()
    +    } else {
    +      rightMatched = new BitSet(rightMatches.size)
    +    }
    +  }
    +
    +  /**
    +   * Scan for next matching rows in buffers of both sides.
    +   * @return true if next row(s) are found and false otherwise.
    +   */
    +  private def scanNextInBuffered(): Boolean = {
    +    while (leftIndex < leftMatches.size) {
    +      while (rightIndex < rightMatches.size
    +        && !boundCondition(joinedRow(leftMatches(leftIndex), rightMatches(rightIndex)))) {
    +        rightIndex += 1
    +      }
    +      if (rightIndex < rightMatches.size) {
    +        // matched with condition
    +        leftMatched.set(leftIndex)
    +        rightMatched.set(rightIndex)
    +        rightIndex += 1
    +        return true
    +      }
    --- End diff --
    
    same here. I find the following easier to read. I believe it's functionally equivalent.
    ```
    // Given a left row, return the first valid matching row on the right
    while (rightIndex < rightMatches.size) {
      if (isValidMatch) {
        leftMatched.set(leftIndex)
        rightMatched.set(rightIndex)
        rightIndex += 1
        return true
      }
      rightIndex += 1
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-137277981
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-138756687
  
      [Test build #42166 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42166/console) for   PR 8579 at commit [`c861d72`](https://github.com/apache/spark/commit/c861d7204ffc380a950616d39331df3f202882c3).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-138732952
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42154/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-138692449
  
    @davies I looked at this patch in great detail and believe it's correct and very efficient. This looks good pending a few comments and readability improvements I suggested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-138703225
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8579#discussion_r38975349
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala ---
    @@ -271,3 +297,189 @@ private class RightOuterIterator(
     
       override def getRow: InternalRow = resultProj(joinedRow)
     }
    +
    +private class SortMergeFullOuterJoinScanner(
    +    leftKeyGenerator: Projection,
    +    rightKeyGenerator: Projection,
    +    keyOrdering: Ordering[InternalRow],
    +    leftIter: RowIterator,
    +    numLeftRows: LongSQLMetric,
    +    rightIter: RowIterator,
    +    numRightRows: LongSQLMetric,
    +    boundCondition: InternalRow => Boolean,
    +    leftNullRow: InternalRow,
    +    rightNullRow: InternalRow)  {
    +  private[this] val joinedRow: JoinedRow = new JoinedRow()
    +  private[this] var leftRow: InternalRow = _
    +  private[this] var leftRowKey: InternalRow = _
    +  private[this] var rightRow: InternalRow = _
    +  private[this] var rightRowKey: InternalRow = _
    +
    +  private[this] var leftIndex: Int = 0
    +  private[this] var rightIndex: Int = 0
    +  private[this] val leftMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow]
    +  private[this] val rightMatches: ArrayBuffer[InternalRow] = new ArrayBuffer[InternalRow]
    +  private[this] var leftMatched: BitSet = new BitSet(1)
    +  private[this] var rightMatched: BitSet = new BitSet(1)
    +
    +  advancedLeft()
    +  advancedRight()
    +
    +  // --- Private methods --------------------------------------------------------------------------
    +
    +  /**
    +   * Advance the left iterator and compute the new row's join key.
    +   * @return true if the left iterator returned a row and false otherwise.
    +   */
    +  private def advancedLeft(): Boolean = {
    +    if (leftIter.advanceNext()) {
    +      leftRow = leftIter.getRow
    +      leftRowKey = leftKeyGenerator(leftRow)
    +      numLeftRows += 1
    +      true
    +    } else {
    +      leftRow = null
    +      leftRowKey = null
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Advance the right iterator and compute the new row's join key.
    +   * @return true if the right iterator returned a row and false otherwise.
    +   */
    +  private def advancedRight(): Boolean = {
    +    if (rightIter.advanceNext()) {
    +      rightRow = rightIter.getRow
    +      rightRowKey = rightKeyGenerator(rightRow)
    +      numRightRows += 1
    +      true
    +    } else {
    +      rightRow = null
    +      rightRowKey = null
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Consume rows from both iterators until their keys are different from the provided matchingKey.
    +   */
    +  private def findMatchingRows(matchingKey: InternalRow): Unit = {
    +    leftMatches.clear()
    +    rightMatches.clear()
    +    leftIndex = 0
    +    rightIndex = 0
    +
    +    while (leftRowKey != null && keyOrdering.compare(leftRowKey, matchingKey) == 0) {
    +      leftMatches += leftRow.copy()
    +      advancedLeft()
    +    }
    +    while (rightRowKey != null && keyOrdering.compare(rightRowKey, matchingKey) == 0) {
    +      rightMatches += rightRow.copy()
    +      advancedRight()
    +    }
    +
    +    if (leftMatches.size <= leftMatched.capacity) {
    +      leftMatched.clear()
    +    } else {
    +      leftMatched = new BitSet(leftMatches.size)
    +    }
    +    if (rightMatches.size <= rightMatched.capacity) {
    +      rightMatched.clear()
    +    } else {
    +      rightMatched = new BitSet(rightMatches.size)
    +    }
    +  }
    +
    +  /**
    +   * Scan for next matching rows in buffers of both sides.
    +   * @return true if next row(s) are found and false otherwise.
    +   */
    +  private def scanNextInBuffered(): Boolean = {
    +    while (leftIndex < leftMatches.size) {
    +      while (rightIndex < rightMatches.size
    +        && !boundCondition(joinedRow(leftMatches(leftIndex), rightMatches(rightIndex)))) {
    +        rightIndex += 1
    +      }
    +      if (rightIndex < rightMatches.size) {
    +        // matched with condition
    +        leftMatched.set(leftIndex)
    +        rightMatched.set(rightIndex)
    +        rightIndex += 1
    +        return true
    +      }
    +      leftIndex += 1
    +      rightIndex = 0
    +      if (!leftMatched.get(leftIndex - 1)) {
    --- End diff --
    
    Can you add a comment:
    ```
    // If this left row had no matches on the right, join it with the null row
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-138656857
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-137260752
  
      [Test build #41948 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/41948/consoleFull) for   PR 8579 at commit [`16756a6`](https://github.com/apache/spark/commit/16756a6120effea72bda039b5d0e09a24087504e).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/8579


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9730] [SQL] Add Full Outer Join support...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8579#issuecomment-137259513
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org