You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by adrian-wang <gi...@git.apache.org> on 2014/05/20 11:35:14 UTC

[GitHub] spark pull request: add support for left semi join

GitHub user adrian-wang opened a pull request:

    https://github.com/apache/spark/pull/837

    add support for left semi join

    Just submit another solution for #395

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/adrian-wang/spark left-semi-join-support

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/837.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #837
    
----
commit 14cff800ff2c33ffcb1eaa70bab8109b163cc9d0
Author: Daoyuan <da...@intel.com>
Date:   2014-05-20T09:33:32Z

    add support for left semi join

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-45418856
  
    (I deleted my earlier comment because I found a mistake)
    
    I think this is looking pretty good, but we should at least add one test for the Broadcast Nested Loop version.  Here's a PR against your branch that does that: https://github.com/adrian-wang/spark/pull/1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: add support for left semi join

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-43727532
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-44922845
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: add support for left semi join

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-43775032
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-45431143
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15540/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: add support for left semi join

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-43605156
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-45429982
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: add support for left semi join

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-43788086
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15121/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: add support for left semi join

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-43729450
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: add support for left semi join

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-43729433
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-45431142
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: add support for left semi join

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-43788084
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: add support for left semi join

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/837#discussion_r13262687
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
    @@ -144,6 +144,150 @@ case class HashJoin(
      * :: DeveloperApi ::
      */
     @DeveloperApi
    +case class LeftSemiJoinHash(
    +                     leftKeys: Seq[Expression],
    +                     rightKeys: Seq[Expression],
    +                     buildSide: BuildSide,
    +                     left: SparkPlan,
    +                     right: SparkPlan) extends BinaryNode {
    +
    +  override def outputPartitioning: Partitioning = left.outputPartitioning
    +
    +  override def requiredChildDistribution =
    +    ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
    +
    +  val (buildPlan, streamedPlan) = buildSide match {
    +    case BuildLeft => (left, right)
    +    case BuildRight => (right, left)
    +  }
    +
    +  val (buildKeys, streamedKeys) = buildSide match {
    +    case BuildLeft => (leftKeys, rightKeys)
    +    case BuildRight => (rightKeys, leftKeys)
    +  }
    +
    +  def output = left.output
    +
    +  @transient lazy val buildSideKeyGenerator = new Projection(buildKeys, buildPlan.output)
    +  @transient lazy val streamSideKeyGenerator =
    +    () => new MutableProjection(streamedKeys, streamedPlan.output)
    +
    +  def execute() = {
    +
    +    buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
    +    // TODO: Use Spark's HashMap implementation.
    +      val hashTable = new java.util.HashMap[Row, ArrayBuffer[Row]]()
    +      var currentRow: Row = null
    +
    +      // Create a mapping of buildKeys -> rows
    +      while (buildIter.hasNext) {
    +        currentRow = buildIter.next()
    +        val rowKey = buildSideKeyGenerator(currentRow)
    +        if(!rowKey.anyNull) {
    +          val existingMatchList = hashTable.get(rowKey)
    +          val matchList = if (existingMatchList == null) {
    +            val newMatchList = new ArrayBuffer[Row]()
    +            hashTable.put(rowKey, newMatchList)
    +            newMatchList
    +          } else {
    +            existingMatchList
    +          }
    +          matchList += currentRow.copy()
    +        }
    +      }
    +
    +      new Iterator[Row] {
    +        private[this] var currentStreamedRow: Row = _
    +        private[this] var currentHashMatched: Boolean = false
    +
    +        private[this] val joinKeys = streamSideKeyGenerator()
    +
    +        override final def hasNext: Boolean =
    +          streamIter.hasNext && fetchNext()
    +
    +        override final def next() = {
    +          currentStreamedRow
    --- End diff --
    
    Is this correct if the operator is created with BuildLeft instead of BuildRight?  I think that would turn it into a RightSemiJoin.  Perhaps we should just remove the option to build on the other side.  I think you can then also safely simplify this to use a HashSet instead of a HashMap, which will reduce memory consumption significantly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/837#discussion_r13520365
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -119,6 +119,11 @@ object HashFilteredJoin extends Logging with PredicateHelper {
         case FilteredOperation(predicates, join @ Join(left, right, Inner, condition)) =>
           logger.debug(s"Considering hash inner join on: ${predicates ++ condition}")
           splitPredicates(predicates ++ condition, join)
    +    // All predicates can be evaluated for left semi join (those that are in the WHERE
    +    // clause can only from left table, so they can all be pushed down.)
    --- End diff --
    
    I think in general we should avoid making too many assumptions in the planner about what optimizations have occurred.  For example, in the future we might avoid pushing down predicates that are very expensive to evaluate as it might be cheaper to run them on an already filtered set of data.  However, in the case of LEFT SEMI JOIN, I think it is actually okay to push all evaluation into the join condition, even if they only refer to the left table.  Is that true?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: add support for left semi join

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/837#discussion_r13262607
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
    @@ -144,6 +144,150 @@ case class HashJoin(
      * :: DeveloperApi ::
      */
     @DeveloperApi
    +case class LeftSemiJoinHash(
    +                     leftKeys: Seq[Expression],
    --- End diff --
    
    Intent only 4 spaces here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-45526760
  
    Thanks!  I've merged this into 1.0 and master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: add support for left semi join

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-43775013
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-44927038
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15374/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: add support for left semi join

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-44756469
  
    This is getting closer.  Thanks for working on it!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: add support for left semi join

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-44756501
  
    Can you add "[SPARK-1495][SQL]" to the PR title?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-44927036
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: add support for left semi join

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/837#discussion_r13262730
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
    @@ -144,6 +144,150 @@ case class HashJoin(
      * :: DeveloperApi ::
      */
     @DeveloperApi
    +case class LeftSemiJoinHash(
    +                     leftKeys: Seq[Expression],
    +                     rightKeys: Seq[Expression],
    +                     buildSide: BuildSide,
    +                     left: SparkPlan,
    +                     right: SparkPlan) extends BinaryNode {
    +
    +  override def outputPartitioning: Partitioning = left.outputPartitioning
    +
    +  override def requiredChildDistribution =
    +    ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
    +
    +  val (buildPlan, streamedPlan) = buildSide match {
    +    case BuildLeft => (left, right)
    +    case BuildRight => (right, left)
    +  }
    +
    +  val (buildKeys, streamedKeys) = buildSide match {
    +    case BuildLeft => (leftKeys, rightKeys)
    +    case BuildRight => (rightKeys, leftKeys)
    +  }
    +
    +  def output = left.output
    +
    +  @transient lazy val buildSideKeyGenerator = new Projection(buildKeys, buildPlan.output)
    +  @transient lazy val streamSideKeyGenerator =
    +    () => new MutableProjection(streamedKeys, streamedPlan.output)
    +
    +  def execute() = {
    +
    +    buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
    +    // TODO: Use Spark's HashMap implementation.
    +      val hashTable = new java.util.HashMap[Row, ArrayBuffer[Row]]()
    +      var currentRow: Row = null
    +
    +      // Create a mapping of buildKeys -> rows
    +      while (buildIter.hasNext) {
    +        currentRow = buildIter.next()
    +        val rowKey = buildSideKeyGenerator(currentRow)
    +        if(!rowKey.anyNull) {
    +          val existingMatchList = hashTable.get(rowKey)
    +          val matchList = if (existingMatchList == null) {
    +            val newMatchList = new ArrayBuffer[Row]()
    +            hashTable.put(rowKey, newMatchList)
    +            newMatchList
    +          } else {
    +            existingMatchList
    +          }
    +          matchList += currentRow.copy()
    +        }
    +      }
    +
    +      new Iterator[Row] {
    +        private[this] var currentStreamedRow: Row = _
    +        private[this] var currentHashMatched: Boolean = false
    +
    +        private[this] val joinKeys = streamSideKeyGenerator()
    +
    +        override final def hasNext: Boolean =
    +          streamIter.hasNext && fetchNext()
    +
    +        override final def next() = {
    +          currentStreamedRow
    +        }
    +
    +        /**
    +         * Searches the streamed iterator for the next row that has at least one match in hashtable.
    +         *
    +         * @return true if the search is successful, and false the streamed iterator runs out of
    +         *         tuples.
    +         */
    +        private final def fetchNext(): Boolean = {
    +          currentHashMatched = false
    +          while (!currentHashMatched && streamIter.hasNext) {
    +            currentStreamedRow = streamIter.next()
    +            if (!joinKeys(currentStreamedRow).anyNull) {
    +              currentHashMatched = true
    +            }
    +          }
    +          currentHashMatched
    +        }
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * :: DeveloperApi ::
    + */
    +@DeveloperApi
    +case class LeftSemiJoinBNL(
    --- End diff --
    
    I don't think this operator is exercised by the included test cases.  We should add a test where the join condition can be calculated with hash keys.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: add support for left semi join

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-43737611
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15116/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...

Posted by adrian-wang <gi...@git.apache.org>.
Github user adrian-wang commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-44923039
  
    Hi Michael, when I was adding the Scala doc, I realized that if the join is not calculated in LeftSemiJoinHash, then it simply means there's no join keys for the left semi join. Then if I pushed down those predicates and conditions(all of them can be pushed down), I only need to verify the if right table size is null here, to decide whether to output the left table. So LeftSemiJoinBNL would be very much excessive. Am I right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: add support for left semi join

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/837#discussion_r13262710
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
    @@ -144,6 +144,150 @@ case class HashJoin(
      * :: DeveloperApi ::
      */
     @DeveloperApi
    +case class LeftSemiJoinHash(
    +                     leftKeys: Seq[Expression],
    +                     rightKeys: Seq[Expression],
    +                     buildSide: BuildSide,
    +                     left: SparkPlan,
    +                     right: SparkPlan) extends BinaryNode {
    +
    +  override def outputPartitioning: Partitioning = left.outputPartitioning
    +
    +  override def requiredChildDistribution =
    +    ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
    +
    +  val (buildPlan, streamedPlan) = buildSide match {
    +    case BuildLeft => (left, right)
    +    case BuildRight => (right, left)
    +  }
    +
    +  val (buildKeys, streamedKeys) = buildSide match {
    +    case BuildLeft => (leftKeys, rightKeys)
    +    case BuildRight => (rightKeys, leftKeys)
    +  }
    +
    +  def output = left.output
    +
    +  @transient lazy val buildSideKeyGenerator = new Projection(buildKeys, buildPlan.output)
    +  @transient lazy val streamSideKeyGenerator =
    +    () => new MutableProjection(streamedKeys, streamedPlan.output)
    +
    +  def execute() = {
    +
    +    buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
    +    // TODO: Use Spark's HashMap implementation.
    +      val hashTable = new java.util.HashMap[Row, ArrayBuffer[Row]]()
    +      var currentRow: Row = null
    +
    +      // Create a mapping of buildKeys -> rows
    +      while (buildIter.hasNext) {
    +        currentRow = buildIter.next()
    +        val rowKey = buildSideKeyGenerator(currentRow)
    +        if(!rowKey.anyNull) {
    +          val existingMatchList = hashTable.get(rowKey)
    +          val matchList = if (existingMatchList == null) {
    +            val newMatchList = new ArrayBuffer[Row]()
    +            hashTable.put(rowKey, newMatchList)
    +            newMatchList
    +          } else {
    +            existingMatchList
    +          }
    +          matchList += currentRow.copy()
    +        }
    +      }
    +
    +      new Iterator[Row] {
    +        private[this] var currentStreamedRow: Row = _
    +        private[this] var currentHashMatched: Boolean = false
    +
    +        private[this] val joinKeys = streamSideKeyGenerator()
    +
    +        override final def hasNext: Boolean =
    +          streamIter.hasNext && fetchNext()
    +
    +        override final def next() = {
    +          currentStreamedRow
    +        }
    +
    +        /**
    +         * Searches the streamed iterator for the next row that has at least one match in hashtable.
    +         *
    +         * @return true if the search is successful, and false the streamed iterator runs out of
    +         *         tuples.
    +         */
    +        private final def fetchNext(): Boolean = {
    +          currentHashMatched = false
    +          while (!currentHashMatched && streamIter.hasNext) {
    +            currentStreamedRow = streamIter.next()
    +            if (!joinKeys(currentStreamedRow).anyNull) {
    +              currentHashMatched = true
    +            }
    +          }
    +          currentHashMatched
    +        }
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * :: DeveloperApi ::
    + */
    +@DeveloperApi
    +case class LeftSemiJoinBNL(
    +    streamed: SparkPlan, broadcast: SparkPlan, condition: Option[Expression])
    +    (@transient sc: SparkContext)
    +  extends BinaryNode {
    +  // TODO: Override requiredChildDistribution.
    +
    +  override def outputPartitioning: Partitioning = streamed.outputPartitioning
    +
    +  override def otherCopyArgs = sc :: Nil
    +
    +  def output = left.output
    +
    +  /** The Streamed Relation */
    +  def left = streamed
    +  /** The Broadcast relation */
    +  def right = broadcast
    +
    +  @transient lazy val boundCondition =
    +    InterpretedPredicate(
    +      condition
    +        .map(c => BindReferences.bindReference(c, left.output ++ right.output))
    +        .getOrElse(Literal(true)))
    +
    +
    +  def execute() = {
    +    val broadcastedRelation = sc.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq)
    +
    +    val streamedPlusMatches = streamed.execute().mapPartitions { streamedIter =>
    +      val joinedRow = new JoinedRow
    +
    +      streamedIter.filter(streamedRow => {
    +        var i = 0
    +        var matched = false
    +
    +        while (i < broadcastedRelation.value.size && !matched) {
    +          // TODO: One bitset per partition instead of per row.
    +          val broadcastedRow = broadcastedRelation.value(i)
    +          if (boundCondition(joinedRow(streamedRow, broadcastedRow))) {
    +            matched = true
    +          }
    +          i += 1
    +        }
    +        matched
    +      }).map(streamedRow => (streamedRow, null))
    +    }
    +
    +    streamedPlusMatches.map(_._1)
    --- End diff --
    
    I think you can remove the above tuple creation as well as this line that removes it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-45418467
  
    I think this is looking pretty good.  One problem is that there are no tests for the nested loop version.  I tried adding this to SQLQuerySuite:
    
    ```scala
      test("left semi greater than predicate") {
        checkAnswer(
          sql("SELECT * FROM testData2 x JOIN testData2 y WHERE x.a >= y.a + 2"),
          Seq((3,1), (3,2))
        )
      }
    ```
    
    However this points out that we need to fix the other join strategies to avoid matching semi joins:
    ```scala
    [info] - left semi greater than predicate *** FAILED *** (174 milliseconds)
    [info]   Results do not match for query:
    ...
    [info] == Physical Plan ==
    [info] Project [a#18:0,b#19:1,a#20:2,b#21:3]
    [info]  Filter (a#18:0 >= (a#20:2 + 2))
    [info]   CartesianProduct 
    [info]    ExistingRdd [a#18,b#19], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:174
    [info]    ExistingRdd [a#20,b#21], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:174
    [info] 
    [info] == Results ==
    [info] !== Correct Answer - 2 ==   == Spark Answer - 4 ==
    [info] !Vector(3, 1)               [3,1,1,1]
    [info] !Vector(3, 2)               [3,1,1,2]
    [info] !                           [3,2,1,1]
    [info] !                           [3,2,1,2] (QueryTest.scala:54)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: add support for left semi join

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-43737610
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: add support for left semi join

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/837#discussion_r13262640
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
    @@ -144,6 +144,150 @@ case class HashJoin(
      * :: DeveloperApi ::
    --- End diff --
    
    I realize that we aren't particularly good about this in most of the other physical operators, but could you add some Scala doc here about how this operator works and what the expected performance characteristics are?  Same below.  The goal of the Scala doc for physical operators should be to make it easy for people to understand query plans that are printed out by EXPLAIN.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-44922835
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/837


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: add support for left semi join

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-43727297
  
    Jenkins, add to whitelist.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: add support for left semi join

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/837#discussion_r13262704
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
    @@ -144,6 +144,150 @@ case class HashJoin(
      * :: DeveloperApi ::
      */
     @DeveloperApi
    +case class LeftSemiJoinHash(
    +                     leftKeys: Seq[Expression],
    +                     rightKeys: Seq[Expression],
    +                     buildSide: BuildSide,
    +                     left: SparkPlan,
    +                     right: SparkPlan) extends BinaryNode {
    +
    +  override def outputPartitioning: Partitioning = left.outputPartitioning
    +
    +  override def requiredChildDistribution =
    +    ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
    +
    +  val (buildPlan, streamedPlan) = buildSide match {
    +    case BuildLeft => (left, right)
    +    case BuildRight => (right, left)
    +  }
    +
    +  val (buildKeys, streamedKeys) = buildSide match {
    +    case BuildLeft => (leftKeys, rightKeys)
    +    case BuildRight => (rightKeys, leftKeys)
    +  }
    +
    +  def output = left.output
    +
    +  @transient lazy val buildSideKeyGenerator = new Projection(buildKeys, buildPlan.output)
    +  @transient lazy val streamSideKeyGenerator =
    +    () => new MutableProjection(streamedKeys, streamedPlan.output)
    +
    +  def execute() = {
    +
    +    buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
    +    // TODO: Use Spark's HashMap implementation.
    +      val hashTable = new java.util.HashMap[Row, ArrayBuffer[Row]]()
    +      var currentRow: Row = null
    +
    +      // Create a mapping of buildKeys -> rows
    +      while (buildIter.hasNext) {
    +        currentRow = buildIter.next()
    +        val rowKey = buildSideKeyGenerator(currentRow)
    +        if(!rowKey.anyNull) {
    +          val existingMatchList = hashTable.get(rowKey)
    +          val matchList = if (existingMatchList == null) {
    +            val newMatchList = new ArrayBuffer[Row]()
    +            hashTable.put(rowKey, newMatchList)
    +            newMatchList
    +          } else {
    +            existingMatchList
    +          }
    +          matchList += currentRow.copy()
    +        }
    +      }
    +
    +      new Iterator[Row] {
    +        private[this] var currentStreamedRow: Row = _
    +        private[this] var currentHashMatched: Boolean = false
    +
    +        private[this] val joinKeys = streamSideKeyGenerator()
    +
    +        override final def hasNext: Boolean =
    +          streamIter.hasNext && fetchNext()
    +
    +        override final def next() = {
    +          currentStreamedRow
    +        }
    +
    +        /**
    +         * Searches the streamed iterator for the next row that has at least one match in hashtable.
    +         *
    +         * @return true if the search is successful, and false the streamed iterator runs out of
    +         *         tuples.
    +         */
    +        private final def fetchNext(): Boolean = {
    +          currentHashMatched = false
    +          while (!currentHashMatched && streamIter.hasNext) {
    +            currentStreamedRow = streamIter.next()
    +            if (!joinKeys(currentStreamedRow).anyNull) {
    +              currentHashMatched = true
    +            }
    +          }
    +          currentHashMatched
    +        }
    +      }
    +    }
    +  }
    +}
    +
    +/**
    + * :: DeveloperApi ::
    + */
    +@DeveloperApi
    +case class LeftSemiJoinBNL(
    +    streamed: SparkPlan, broadcast: SparkPlan, condition: Option[Expression])
    +    (@transient sc: SparkContext)
    +  extends BinaryNode {
    +  // TODO: Override requiredChildDistribution.
    +
    +  override def outputPartitioning: Partitioning = streamed.outputPartitioning
    +
    +  override def otherCopyArgs = sc :: Nil
    +
    +  def output = left.output
    +
    +  /** The Streamed Relation */
    +  def left = streamed
    +  /** The Broadcast relation */
    +  def right = broadcast
    +
    +  @transient lazy val boundCondition =
    +    InterpretedPredicate(
    +      condition
    +        .map(c => BindReferences.bindReference(c, left.output ++ right.output))
    +        .getOrElse(Literal(true)))
    +
    +
    +  def execute() = {
    +    val broadcastedRelation = sc.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq)
    +
    +    val streamedPlusMatches = streamed.execute().mapPartitions { streamedIter =>
    +      val joinedRow = new JoinedRow
    +
    +      streamedIter.filter(streamedRow => {
    +        var i = 0
    +        var matched = false
    +
    +        while (i < broadcastedRelation.value.size && !matched) {
    +          // TODO: One bitset per partition instead of per row.
    --- End diff --
    
    Is this comment stale?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: add support for left semi join

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-43727534
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15115/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...

Posted by adrian-wang <gi...@git.apache.org>.
Github user adrian-wang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/837#discussion_r13522951
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---
    @@ -119,6 +119,11 @@ object HashFilteredJoin extends Logging with PredicateHelper {
         case FilteredOperation(predicates, join @ Join(left, right, Inner, condition)) =>
           logger.debug(s"Considering hash inner join on: ${predicates ++ condition}")
           splitPredicates(predicates ++ condition, join)
    +    // All predicates can be evaluated for left semi join (those that are in the WHERE
    +    // clause can only from left table, so they can all be pushed down.)
    --- End diff --
    
    Yes, I think LEFT SEMI JOIN would not suffer by pushing down predicates.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: add support for left semi join

Posted by adrian-wang <gi...@git.apache.org>.
Github user adrian-wang commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-43605087
  
    This is a solution with #418 from @marmbrus .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: add support for left semi join

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-43727343
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1495][SQL]add support for left semi joi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-45429745
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: add support for left semi join

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/837#issuecomment-43727320
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---