You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by davies <gi...@git.apache.org> on 2016/02/09 08:40:58 UTC

[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

GitHub user davies opened a pull request:

    https://github.com/apache/spark/pull/11130

    [SPARK-13237] [SQL] generated broadcast outer join

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/davies/spark gen_out

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/11130.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #11130
    
----
commit 9c09b6c103833c8e3113c706a787da67b8b3e66b
Author: Davies Liu <da...@databricks.com>
Date:   2016-02-09T00:33:44Z

    generated broadcast outer join

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-181752868
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50961/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-184468197
  
    **[Test build #51329 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/51329/consoleFull)** for PR 11130 at commit [`5724180`](https://github.com/apache/spark/commit/57241806ae31130429cb68a58a4086f15c3965f4).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-181982066
  
    **[Test build #2527 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2527/consoleFull)** for PR 11130 at commit [`52efe91`](https://github.com/apache/spark/commit/52efe91168a4be7ce721d2f56e2b1e7aab9379db).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-181752864
  
    **[Test build #50961 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50961/consoleFull)** for PR 11130 at commit [`52efe91`](https://github.com/apache/spark/commit/52efe91168a4be7ce721d2f56e2b1e7aab9379db).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11130#discussion_r52838281
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---
    @@ -105,75 +107,129 @@ case class BroadcastHashJoin(
         val broadcastRelation = Await.result(broadcastFuture, timeout)
     
         streamedPlan.execute().mapPartitions { streamedIter =>
    -      val hashedRelation = broadcastRelation.value
    -      TaskContext.get().taskMetrics().incPeakExecutionMemory(hashedRelation.getMemorySize)
    -      hashJoin(streamedIter, hashedRelation, numOutputRows)
    +      val joinedRow = new JoinedRow()
    +      val hashTable = broadcastRelation.value
    +      TaskContext.get().taskMetrics().incPeakExecutionMemory(hashTable.getMemorySize)
    +      val keyGenerator = streamSideKeyGenerator
    +      val resultProj = createResultProjection
    +
    +      joinType match {
    +        case Inner =>
    +          hashJoin(streamedIter, hashTable, numOutputRows)
    +
    +        case LeftOuter =>
    +          streamedIter.flatMap(currentRow => {
    --- End diff --
    
    nitpick
    ```scala
    streamedIter.flatMap { currentRow =>
      // ...
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11130#discussion_r52399175
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala ---
    @@ -165,4 +183,73 @@ trait HashJoin {
           }
         }
       }
    +
    +  @transient protected[this] lazy val EMPTY_LIST = CompactBuffer[InternalRow]()
    +
    +  @transient private[this] lazy val leftNullRow = new GenericInternalRow(left.output.length)
    +  @transient private[this] lazy val rightNullRow = new GenericInternalRow(right.output.length)
    +
    +  protected[this] def leftOuterIterator(
    +    key: InternalRow,
    +    joinedRow: JoinedRow,
    +    rightIter: Iterable[InternalRow],
    +    resultProjection: InternalRow => InternalRow,
    +    numOutputRows: LongSQLMetric): Iterator[InternalRow] = {
    +    val ret: Iterable[InternalRow] = {
    +      if (!key.anyNull) {
    +        val temp = if (rightIter != null) {
    +          rightIter.collect {
    +            case r if boundCondition(joinedRow.withRight(r)) => {
    +              numOutputRows += 1
    +              resultProjection(joinedRow).copy()
    +            }
    +          }
    +        } else {
    +          List.empty
    +        }
    +        if (temp.isEmpty) {
    +          numOutputRows += 1
    +          resultProjection(joinedRow.withRight(rightNullRow)) :: Nil
    +        } else {
    +          temp
    +        }
    +      } else {
    +        numOutputRows += 1
    +        resultProjection(joinedRow.withRight(rightNullRow)) :: Nil
    +      }
    +    }
    +    ret.iterator
    +  }
    +
    +  protected[this] def rightOuterIterator(
    +    key: InternalRow,
    +    leftIter: Iterable[InternalRow],
    +    joinedRow: JoinedRow,
    +    resultProjection: InternalRow => InternalRow,
    +    numOutputRows: LongSQLMetric): Iterator[InternalRow] = {
    +    val ret: Iterable[InternalRow] = {
    --- End diff --
    
    indenting


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-182039006
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50992/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-181752129
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50960/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11130#discussion_r52838460
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---
    @@ -105,75 +107,129 @@ case class BroadcastHashJoin(
         val broadcastRelation = Await.result(broadcastFuture, timeout)
     
         streamedPlan.execute().mapPartitions { streamedIter =>
    -      val hashedRelation = broadcastRelation.value
    -      TaskContext.get().taskMetrics().incPeakExecutionMemory(hashedRelation.getMemorySize)
    -      hashJoin(streamedIter, hashedRelation, numOutputRows)
    +      val joinedRow = new JoinedRow()
    +      val hashTable = broadcastRelation.value
    +      TaskContext.get().taskMetrics().incPeakExecutionMemory(hashTable.getMemorySize)
    +      val keyGenerator = streamSideKeyGenerator
    +      val resultProj = createResultProjection
    +
    +      joinType match {
    +        case Inner =>
    +          hashJoin(streamedIter, hashTable, numOutputRows)
    +
    +        case LeftOuter =>
    +          streamedIter.flatMap(currentRow => {
    +            val rowKey = keyGenerator(currentRow)
    +            joinedRow.withLeft(currentRow)
    +            leftOuterIterator(rowKey, joinedRow, hashTable.get(rowKey), resultProj, numOutputRows)
    +          })
    +
    +        case RightOuter =>
    +          streamedIter.flatMap(currentRow => {
    +            val rowKey = keyGenerator(currentRow)
    +            joinedRow.withRight(currentRow)
    +            rightOuterIterator(rowKey, hashTable.get(rowKey), joinedRow, resultProj, numOutputRows)
    +          })
    +
    +        case x =>
    +          throw new IllegalArgumentException(
    +            s"BroadcastHashJoin should not take $x as the JoinType")
    +      }
         }
       }
     
    -  private var broadcastRelation: Broadcast[HashedRelation] = _
    -  // the term for hash relation
    -  private var relationTerm: String = _
    -
       override def upstream(): RDD[InternalRow] = {
         streamedPlan.asInstanceOf[CodegenSupport].upstream()
       }
     
       override def doProduce(ctx: CodegenContext): String = {
    +    streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this)
    +  }
    +
    +  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
    +    if (joinType == Inner) {
    +      codegenInner(ctx, input)
    +    } else {
    +      // LeftOuter and RightOuter
    +      codegenOuter(ctx, input)
    +    }
    +  }
    +
    +  private def prepareBroadcast(ctx: CodegenContext): (Broadcast[HashedRelation], String) = {
         // create a name for HashedRelation
    -    broadcastRelation = Await.result(broadcastFuture, timeout)
    +    val broadcastRelation = Await.result(broadcastFuture, timeout)
         val broadcast = ctx.addReferenceObj("broadcast", broadcastRelation)
    -    relationTerm = ctx.freshName("relation")
    +    val relationTerm = ctx.freshName("relation")
         val clsName = broadcastRelation.value.getClass.getName
         ctx.addMutableState(clsName, relationTerm,
           s"""
              | $relationTerm = ($clsName) $broadcast.value();
              | incPeakExecutionMemory($relationTerm.getMemorySize());
            """.stripMargin)
    -
    -    s"""
    -       | ${streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this)}
    -     """.stripMargin
    +    (broadcastRelation, relationTerm)
       }
     
    -  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
    -    // generate the key as UnsafeRow or Long
    +  private def genJoinKey(ctx: CodegenContext, input: Seq[ExprCode]): (ExprCode, String) = {
         ctx.currentVars = input
    -    val (keyVal, anyNull) = if (canJoinKeyFitWithinLong) {
    +    if (canJoinKeyFitWithinLong) {
    +      // generate the join key as Long
           val expr = rewriteKeyExpr(streamedKeys).head
           val ev = BindReferences.bindReference(expr, streamedPlan.output).gen(ctx)
           (ev, ev.isNull)
         } else {
    +      // generate the join key as UnsafeRow
           val keyExpr = streamedKeys.map(BindReferences.bindReference(_, streamedPlan.output))
           val ev = GenerateUnsafeProjection.createCode(ctx, keyExpr)
           (ev, s"${ev.value}.anyNull()")
         }
    +  }
     
    -    // find the matches from HashedRelation
    -    val matched = ctx.freshName("matched")
    -
    -    // create variables for output
    +  private def genBuildSideVars(ctx: CodegenContext, matched: String): Seq[ExprCode] = {
         ctx.currentVars = null
         ctx.INPUT_ROW = matched
    -    val buildColumns = buildPlan.output.zipWithIndex.map { case (a, i) =>
    -      BoundReference(i, a.dataType, a.nullable).gen(ctx)
    +    buildPlan.output.zipWithIndex.map { case (a, i) =>
    +      val ev = BoundReference(i, a.dataType, a.nullable).gen(ctx)
    +      if (joinType == Inner) {
    +        ev
    +      } else {
    +        // the variables are needed even there is no matched rows
    --- End diff --
    
    I'm really confused by this. What is this doing?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-183516389
  
    cc @rxin 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-185975528
  
    @rxin @nongli I'm merging this to unblock other codegen/exchange/join work, please take a final look on this, any comments will be addressed by follow-up RPs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-183472117
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/51194/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-183446327
  
    **[Test build #51194 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/51194/consoleFull)** for PR 11130 at commit [`9b05c7c`](https://github.com/apache/spark/commit/9b05c7cd335f06079c241f681282bd36306dc739).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-182149063
  
    I still see initRange() being generated as an empty function. Do we still need this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-183471826
  
    **[Test build #51194 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/51194/consoleFull)** for PR 11130 at commit [`9b05c7c`](https://github.com/apache/spark/commit/9b05c7cd335f06079c241f681282bd36306dc739).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class KMeansModel(JavaModel, MLWritable, MLReadable):`
      * `class KMeans(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasMaxIter, HasTol, HasSeed,`
      * `class BisectingKMeansModel(JavaModel):`
      * `class BisectingKMeans(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasMaxIter, HasSeed):`
      * `class ALS(JavaEstimator, HasCheckpointInterval, HasMaxIter, HasPredictionCol, HasRegParam, HasSeed,`
      * `class ALSModel(JavaModel, MLWritable, MLReadable):`
      * `case class Grouping(child: Expression) extends Expression with Unevaluable `
      * `case class GroupingID(groupByExprs: Seq[Expression]) extends Expression with Unevaluable `
      * `class ContinuousQueryManager(sqlContext: SQLContext) `
      * `class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus)`
      * `class FileStreamSource(`
      * `trait HadoopFsRelationProvider extends StreamSourceProvider `
      * `abstract class ContinuousQueryListener `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11130#discussion_r52838383
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---
    @@ -105,75 +107,129 @@ case class BroadcastHashJoin(
         val broadcastRelation = Await.result(broadcastFuture, timeout)
     
         streamedPlan.execute().mapPartitions { streamedIter =>
    -      val hashedRelation = broadcastRelation.value
    -      TaskContext.get().taskMetrics().incPeakExecutionMemory(hashedRelation.getMemorySize)
    -      hashJoin(streamedIter, hashedRelation, numOutputRows)
    +      val joinedRow = new JoinedRow()
    +      val hashTable = broadcastRelation.value
    +      TaskContext.get().taskMetrics().incPeakExecutionMemory(hashTable.getMemorySize)
    +      val keyGenerator = streamSideKeyGenerator
    +      val resultProj = createResultProjection
    +
    +      joinType match {
    +        case Inner =>
    +          hashJoin(streamedIter, hashTable, numOutputRows)
    +
    +        case LeftOuter =>
    +          streamedIter.flatMap(currentRow => {
    +            val rowKey = keyGenerator(currentRow)
    +            joinedRow.withLeft(currentRow)
    +            leftOuterIterator(rowKey, joinedRow, hashTable.get(rowKey), resultProj, numOutputRows)
    +          })
    +
    +        case RightOuter =>
    +          streamedIter.flatMap(currentRow => {
    +            val rowKey = keyGenerator(currentRow)
    +            joinedRow.withRight(currentRow)
    +            rightOuterIterator(rowKey, hashTable.get(rowKey), joinedRow, resultProj, numOutputRows)
    +          })
    +
    +        case x =>
    +          throw new IllegalArgumentException(
    +            s"BroadcastHashJoin should not take $x as the JoinType")
    +      }
         }
       }
     
    -  private var broadcastRelation: Broadcast[HashedRelation] = _
    -  // the term for hash relation
    -  private var relationTerm: String = _
    -
       override def upstream(): RDD[InternalRow] = {
         streamedPlan.asInstanceOf[CodegenSupport].upstream()
       }
     
       override def doProduce(ctx: CodegenContext): String = {
    +    streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this)
    +  }
    +
    +  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
    +    if (joinType == Inner) {
    +      codegenInner(ctx, input)
    +    } else {
    +      // LeftOuter and RightOuter
    +      codegenOuter(ctx, input)
    +    }
    +  }
    +
    +  private def prepareBroadcast(ctx: CodegenContext): (Broadcast[HashedRelation], String) = {
         // create a name for HashedRelation
    -    broadcastRelation = Await.result(broadcastFuture, timeout)
    +    val broadcastRelation = Await.result(broadcastFuture, timeout)
         val broadcast = ctx.addReferenceObj("broadcast", broadcastRelation)
    -    relationTerm = ctx.freshName("relation")
    +    val relationTerm = ctx.freshName("relation")
         val clsName = broadcastRelation.value.getClass.getName
         ctx.addMutableState(clsName, relationTerm,
           s"""
              | $relationTerm = ($clsName) $broadcast.value();
              | incPeakExecutionMemory($relationTerm.getMemorySize());
            """.stripMargin)
    -
    -    s"""
    -       | ${streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this)}
    -     """.stripMargin
    +    (broadcastRelation, relationTerm)
       }
     
    -  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
    -    // generate the key as UnsafeRow or Long
    +  private def genJoinKey(ctx: CodegenContext, input: Seq[ExprCode]): (ExprCode, String) = {
    --- End diff --
    
    actually this one especially, because the string is not the term, but rather the term for isNull.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-181752128
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11130#discussion_r52399766
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---
    @@ -163,64 +192,142 @@ case class BroadcastHashJoin(
     
         // find the matches from HashedRelation
         val matched = ctx.freshName("matched")
    +    val valid = ctx.freshName("invalid")
     
         // create variables for output
         ctx.currentVars = null
         ctx.INPUT_ROW = matched
         val buildColumns = buildPlan.output.zipWithIndex.map { case (a, i) =>
    -      BoundReference(i, a.dataType, a.nullable).gen(ctx)
    +      val ev = BoundReference(i, a.dataType, a.nullable).gen(ctx)
    +      if (joinType == Inner) {
    +        ev
    +      } else {
    +        val isNull = ctx.freshName("isNull")
    +        val value = ctx.freshName("value")
    +        val code = s"""
    +          |boolean $isNull = true;
    +          |${ctx.javaType(a.dataType)} $value = ${ctx.defaultValue(a.dataType)};
    +          |if ($matched != null) {
    +          |  ${ev.code}
    +          |  $isNull = ${ev.isNull};
    +          |  $value = ${ev.value};
    +          |}
    +         """.stripMargin
    +        ExprCode(code, isNull, value)
    +      }
         }
    +
    +    // output variables
         val resultVars = buildSide match {
           case BuildLeft => buildColumns ++ input
           case BuildRight => input ++ buildColumns
         }
     
    -    val outputCode = if (condition.isDefined) {
    -      // filter the output via condition
    -      ctx.currentVars = resultVars
    -      val ev = BindReferences.bindReference(condition.get, this.output).gen(ctx)
    -      s"""
    -         | ${ev.code}
    -         | if (!${ev.isNull} && ${ev.value}) {
    -         |   ${consume(ctx, resultVars)}
    -         | }
    -       """.stripMargin
    -    } else {
    -      consume(ctx, resultVars)
    -    }
    +    if (joinType == Inner) {
    +      val outputCode = if (condition.isDefined) {
    +        // filter the output via condition
    +        ctx.currentVars = resultVars
    +        val ev = BindReferences.bindReference(condition.get, this.output).gen(ctx)
    +        s"""
    +         |${ev.code}
    +         |if (!${ev.isNull} && ${ev.value}) {
    +         |  ${consume(ctx, resultVars)}
    +         |}
    +         """.stripMargin
    +      } else {
    +        consume(ctx, resultVars)
    +      }
     
    -    if (broadcastRelation.value.isInstanceOf[UniqueHashedRelation]) {
    -      s"""
    -         | // generate join key
    -         | ${keyVal.code}
    -         | // find matches from HashedRelation
    -         | UnsafeRow $matched = $anyNull ? null: (UnsafeRow)$relationTerm.getValue(${keyVal.value});
    -         | if ($matched != null) {
    -         |   ${buildColumns.map(_.code).mkString("\n")}
    -         |   $outputCode
    -         | }
    -     """.stripMargin
    +      if (broadcastRelation.value.isInstanceOf[UniqueHashedRelation]) {
    +        s"""
    +         |// generate join key
    +         |${keyVal.code}
    +         |// find matches from HashedRelation
    +         |UnsafeRow $matched = $anyNull ? null: (UnsafeRow)$relationTerm.getValue(${keyVal.value});
    +         |if ($matched != null) {
    +         |  ${buildColumns.map(_.code).mkString("\n")}
    +         |  $outputCode
    +         |}
    +         """.stripMargin
    +
    +      } else {
    +        val matches = ctx.freshName("matches")
    +        val bufferType = classOf[CompactBuffer[UnsafeRow]].getName
    +        val i = ctx.freshName("i")
    +        val size = ctx.freshName("size")
    +        s"""
    +         |// generate join key
    +         |${keyVal.code}
    +         |// find matches from HashRelation
    +         |$bufferType $matches = $anyNull ? null : ($bufferType) $relationTerm.get(${keyVal.value});
    +         |if ($matches != null) {
    +         |  int $size = $matches.size();
    +         |  for (int $i = 0; $i < $size; $i++) {
    +         |    UnsafeRow $matched = (UnsafeRow) $matches.apply($i);
    +         |    ${buildColumns.map(_.code).mkString("\n")}
    +         |    $outputCode
    +         |  }
    +         |}
    +         """.stripMargin
    +      }
     
         } else {
    -      val matches = ctx.freshName("matches")
    -      val bufferType = classOf[CompactBuffer[UnsafeRow]].getName
    -      val i = ctx.freshName("i")
    -      val size = ctx.freshName("size")
    -      s"""
    -         | // generate join key
    -         | ${keyVal.code}
    -         | // find matches from HashRelation
    -         | $bufferType $matches = ${anyNull} ? null :
    -         |  ($bufferType) $relationTerm.get(${keyVal.value});
    -         | if ($matches != null) {
    -         |   int $size = $matches.size();
    -         |   for (int $i = 0; $i < $size; $i++) {
    -         |     UnsafeRow $matched = (UnsafeRow) $matches.apply($i);
    -         |     ${buildColumns.map(_.code).mkString("\n")}
    -         |     $outputCode
    -         |   }
    -         | }
    -     """.stripMargin
    +      // LeftOuter and RightOuter
    +
    +      // filter the output via condition
    +      val checkCondition = if (condition.isDefined) {
    +        ctx.currentVars = resultVars
    +        val ev = BindReferences.bindReference(condition.get, this.output).gen(ctx)
    +        s"""
    +         |boolean $valid = true;
    +         |if ($matched != null) {
    +         |  ${ev.code}
    +         |  $valid = !${ev.isNull} && ${ev.value};
    +         |}
    +         """.stripMargin
    +      } else {
    +        s"final boolean $valid = true;"
    +      }
    +
    +      if (broadcastRelation.value.isInstanceOf[UniqueHashedRelation]) {
    +        s"""
    +         |// generate join key
    +         |${keyVal.code}
    +         |// find matches from HashedRelation
    +         |UnsafeRow $matched = $anyNull ? null: (UnsafeRow)$relationTerm.getValue(${keyVal.value});
    +         |${buildColumns.map(_.code).mkString("\n")}
    +         |${checkCondition.trim}
    +         |if (!$valid) {
    +         |  // reset to null
    +         |  ${buildColumns.map(v => s"${v.isNull} = true;").mkString("\n")}
    --- End diff --
    
    Is this right? The most intuitive thing to generate here IMO is a continue. Does that work?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-182543905
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-182025607
  
    **[Test build #50982 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50982/consoleFull)** for PR 11130 at commit [`9a1f532`](https://github.com/apache/spark/commit/9a1f5325e954d8464d28ebf415c9dca665e15d35).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-182150377
  
    **[Test build #2528 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2528/consoleFull)** for PR 11130 at commit [`edbc284`](https://github.com/apache/spark/commit/edbc284921281358a38b300218ff288c33cdc3b4).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11130#discussion_r52838317
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---
    @@ -105,75 +107,129 @@ case class BroadcastHashJoin(
         val broadcastRelation = Await.result(broadcastFuture, timeout)
     
         streamedPlan.execute().mapPartitions { streamedIter =>
    -      val hashedRelation = broadcastRelation.value
    -      TaskContext.get().taskMetrics().incPeakExecutionMemory(hashedRelation.getMemorySize)
    -      hashJoin(streamedIter, hashedRelation, numOutputRows)
    +      val joinedRow = new JoinedRow()
    +      val hashTable = broadcastRelation.value
    +      TaskContext.get().taskMetrics().incPeakExecutionMemory(hashTable.getMemorySize)
    +      val keyGenerator = streamSideKeyGenerator
    +      val resultProj = createResultProjection
    +
    +      joinType match {
    +        case Inner =>
    +          hashJoin(streamedIter, hashTable, numOutputRows)
    +
    +        case LeftOuter =>
    +          streamedIter.flatMap(currentRow => {
    +            val rowKey = keyGenerator(currentRow)
    +            joinedRow.withLeft(currentRow)
    +            leftOuterIterator(rowKey, joinedRow, hashTable.get(rowKey), resultProj, numOutputRows)
    +          })
    +
    +        case RightOuter =>
    +          streamedIter.flatMap(currentRow => {
    +            val rowKey = keyGenerator(currentRow)
    +            joinedRow.withRight(currentRow)
    +            rightOuterIterator(rowKey, hashTable.get(rowKey), joinedRow, resultProj, numOutputRows)
    +          })
    +
    +        case x =>
    +          throw new IllegalArgumentException(
    +            s"BroadcastHashJoin should not take $x as the JoinType")
    +      }
         }
       }
     
    -  private var broadcastRelation: Broadcast[HashedRelation] = _
    -  // the term for hash relation
    -  private var relationTerm: String = _
    -
       override def upstream(): RDD[InternalRow] = {
         streamedPlan.asInstanceOf[CodegenSupport].upstream()
       }
     
       override def doProduce(ctx: CodegenContext): String = {
    +    streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this)
    +  }
    +
    +  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
    +    if (joinType == Inner) {
    +      codegenInner(ctx, input)
    +    } else {
    +      // LeftOuter and RightOuter
    +      codegenOuter(ctx, input)
    +    }
    +  }
    +
    +  private def prepareBroadcast(ctx: CodegenContext): (Broadcast[HashedRelation], String) = {
         // create a name for HashedRelation
    -    broadcastRelation = Await.result(broadcastFuture, timeout)
    +    val broadcastRelation = Await.result(broadcastFuture, timeout)
         val broadcast = ctx.addReferenceObj("broadcast", broadcastRelation)
    -    relationTerm = ctx.freshName("relation")
    +    val relationTerm = ctx.freshName("relation")
         val clsName = broadcastRelation.value.getClass.getName
         ctx.addMutableState(clsName, relationTerm,
           s"""
              | $relationTerm = ($clsName) $broadcast.value();
              | incPeakExecutionMemory($relationTerm.getMemorySize());
            """.stripMargin)
    -
    -    s"""
    -       | ${streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this)}
    -     """.stripMargin
    +    (broadcastRelation, relationTerm)
       }
     
    -  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
    -    // generate the key as UnsafeRow or Long
    +  private def genJoinKey(ctx: CodegenContext, input: Seq[ExprCode]): (ExprCode, String) = {
    --- End diff --
    
    and document this one too


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-182719963
  
    @nongli Could you take another look?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-182605016
  
    **[Test build #2531 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2531/consoleFull)** for PR 11130 at commit [`da45df1`](https://github.com/apache/spark/commit/da45df1536f112a14bfe15d6d30d307cdbd99d5b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-181995461
  
    **[Test build #50982 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50982/consoleFull)** for PR 11130 at commit [`9a1f532`](https://github.com/apache/spark/commit/9a1f5325e954d8464d28ebf415c9dca665e15d35).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-181981559
  
    **[Test build #2527 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2527/consoleFull)** for PR 11130 at commit [`52efe91`](https://github.com/apache/spark/commit/52efe91168a4be7ce721d2f56e2b1e7aab9379db).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-182543908
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/51046/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-182025801
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50982/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-184472795
  
    **[Test build #51333 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/51333/consoleFull)** for PR 11130 at commit [`5744941`](https://github.com/apache/spark/commit/5744941063ba05b07e4a7265277162c331a9c48c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11130#discussion_r52838756
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---
    @@ -105,75 +107,129 @@ case class BroadcastHashJoin(
         val broadcastRelation = Await.result(broadcastFuture, timeout)
     
         streamedPlan.execute().mapPartitions { streamedIter =>
    -      val hashedRelation = broadcastRelation.value
    -      TaskContext.get().taskMetrics().incPeakExecutionMemory(hashedRelation.getMemorySize)
    -      hashJoin(streamedIter, hashedRelation, numOutputRows)
    +      val joinedRow = new JoinedRow()
    +      val hashTable = broadcastRelation.value
    +      TaskContext.get().taskMetrics().incPeakExecutionMemory(hashTable.getMemorySize)
    +      val keyGenerator = streamSideKeyGenerator
    +      val resultProj = createResultProjection
    +
    +      joinType match {
    +        case Inner =>
    +          hashJoin(streamedIter, hashTable, numOutputRows)
    +
    +        case LeftOuter =>
    +          streamedIter.flatMap(currentRow => {
    --- End diff --
    
    These are copied and pasted here, usually don't modify them to reduce the budget  of review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11130#discussion_r52838309
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---
    @@ -105,75 +107,129 @@ case class BroadcastHashJoin(
         val broadcastRelation = Await.result(broadcastFuture, timeout)
     
         streamedPlan.execute().mapPartitions { streamedIter =>
    -      val hashedRelation = broadcastRelation.value
    -      TaskContext.get().taskMetrics().incPeakExecutionMemory(hashedRelation.getMemorySize)
    -      hashJoin(streamedIter, hashedRelation, numOutputRows)
    +      val joinedRow = new JoinedRow()
    +      val hashTable = broadcastRelation.value
    +      TaskContext.get().taskMetrics().incPeakExecutionMemory(hashTable.getMemorySize)
    +      val keyGenerator = streamSideKeyGenerator
    +      val resultProj = createResultProjection
    +
    +      joinType match {
    +        case Inner =>
    +          hashJoin(streamedIter, hashTable, numOutputRows)
    +
    +        case LeftOuter =>
    +          streamedIter.flatMap(currentRow => {
    +            val rowKey = keyGenerator(currentRow)
    +            joinedRow.withLeft(currentRow)
    +            leftOuterIterator(rowKey, joinedRow, hashTable.get(rowKey), resultProj, numOutputRows)
    +          })
    +
    +        case RightOuter =>
    +          streamedIter.flatMap(currentRow => {
    +            val rowKey = keyGenerator(currentRow)
    +            joinedRow.withRight(currentRow)
    +            rightOuterIterator(rowKey, hashTable.get(rowKey), joinedRow, resultProj, numOutputRows)
    +          })
    +
    +        case x =>
    +          throw new IllegalArgumentException(
    +            s"BroadcastHashJoin should not take $x as the JoinType")
    +      }
         }
       }
     
    -  private var broadcastRelation: Broadcast[HashedRelation] = _
    -  // the term for hash relation
    -  private var relationTerm: String = _
    -
       override def upstream(): RDD[InternalRow] = {
         streamedPlan.asInstanceOf[CodegenSupport].upstream()
       }
     
       override def doProduce(ctx: CodegenContext): String = {
    +    streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this)
    +  }
    +
    +  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
    +    if (joinType == Inner) {
    +      codegenInner(ctx, input)
    +    } else {
    +      // LeftOuter and RightOuter
    +      codegenOuter(ctx, input)
    +    }
    +  }
    +
    +  private def prepareBroadcast(ctx: CodegenContext): (Broadcast[HashedRelation], String) = {
    --- End diff --
    
    document what the fields in the return tuple mean


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11130#discussion_r52838767
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---
    @@ -105,75 +107,129 @@ case class BroadcastHashJoin(
         val broadcastRelation = Await.result(broadcastFuture, timeout)
     
         streamedPlan.execute().mapPartitions { streamedIter =>
    -      val hashedRelation = broadcastRelation.value
    -      TaskContext.get().taskMetrics().incPeakExecutionMemory(hashedRelation.getMemorySize)
    -      hashJoin(streamedIter, hashedRelation, numOutputRows)
    +      val joinedRow = new JoinedRow()
    +      val hashTable = broadcastRelation.value
    +      TaskContext.get().taskMetrics().incPeakExecutionMemory(hashTable.getMemorySize)
    +      val keyGenerator = streamSideKeyGenerator
    +      val resultProj = createResultProjection
    +
    +      joinType match {
    +        case Inner =>
    +          hashJoin(streamedIter, hashTable, numOutputRows)
    +
    +        case LeftOuter =>
    +          streamedIter.flatMap(currentRow => {
    --- End diff --
    
    Sure. Thanks for being considerate. Can you just fix them while you are at it now?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11130#discussion_r52399640
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---
    @@ -163,64 +192,142 @@ case class BroadcastHashJoin(
     
         // find the matches from HashedRelation
         val matched = ctx.freshName("matched")
    +    val valid = ctx.freshName("invalid")
    --- End diff --
    
    perhaps renamed to passesFilters or something like that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-182169216
  
    **[Test build #2528 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2528/consoleFull)** for PR 11130 at commit [`edbc284`](https://github.com/apache/spark/commit/edbc284921281358a38b300218ff288c33cdc3b4).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-182149913
  
    @nongli I removed that manually


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-184570209
  
    ping @rxin 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11130#discussion_r52838672
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---
    @@ -184,36 +240,106 @@ case class BroadcastHashJoin(
     
         if (broadcastRelation.value.isInstanceOf[UniqueHashedRelation]) {
           s"""
    -         | // generate join key
    -         | ${keyVal.code}
    -         | // find matches from HashedRelation
    -         | UnsafeRow $matched = $anyNull ? null: (UnsafeRow)$relationTerm.getValue(${keyVal.value});
    -         | if ($matched != null) {
    -         |   ${buildColumns.map(_.code).mkString("\n")}
    -         |   $outputCode
    -         | }
    -     """.stripMargin
    +         |// generate join key
    +         |${keyEv.code}
    +         |// find matches from HashedRelation
    +         |UnsafeRow $matched = $anyNull ? null: (UnsafeRow)$relationTerm.getValue(${keyEv.value});
    +         |if ($matched != null) {
    +         |  ${buildVars.map(_.code).mkString("\n")}
    +         |  $outputCode
    +         |}
    +       """.stripMargin
    +
    +    } else {
    +      val matches = ctx.freshName("matches")
    +      val bufferType = classOf[CompactBuffer[UnsafeRow]].getName
    +      val i = ctx.freshName("i")
    +      val size = ctx.freshName("size")
    +      s"""
    +         |// generate join key
    +         |${keyEv.code}
    +         |// find matches from HashRelation
    +         |$bufferType $matches = $anyNull ? null : ($bufferType)$relationTerm.get(${keyEv.value});
    +         |if ($matches != null) {
    +         |  int $size = $matches.size();
    +         |  for (int $i = 0; $i < $size; $i++) {
    +         |    UnsafeRow $matched = (UnsafeRow) $matches.apply($i);
    +         |    ${buildVars.map(_.code).mkString("\n")}
    +         |    $outputCode
    +         |  }
    +         |}
    +       """.stripMargin
    +    }
    +  }
    +
    +
    +  private def codegenOuter(ctx: CodegenContext, input: Seq[ExprCode]): String = {
    +    val (broadcastRelation, relationTerm) = prepareBroadcast(ctx)
    +    val (keyEv, anyNull) = genJoinKey(ctx, input)
    +    val matched = ctx.freshName("matched")
    +    val buildVars = genBuildSideVars(ctx, matched)
    +    val resultVars = buildSide match {
    +      case BuildLeft => buildVars ++ input
    +      case BuildRight => input ++ buildVars
    +    }
    +    val numOutput = metricTerm(ctx, "numOutputRows")
    +
    +    // filter the output via condition
    +    val conditionPassed = ctx.freshName("conditionPassed")
    +    val checkCondition = if (condition.isDefined) {
    +      ctx.currentVars = resultVars
    +      val ev = BindReferences.bindReference(condition.get, this.output).gen(ctx)
    +      s"""
    +         |boolean $conditionPassed = true;
    +         |if ($matched != null) {
    +         |  ${ev.code}
    +         |  $conditionPassed = !${ev.isNull} && ${ev.value};
    +         |}
    +       """.stripMargin
    +    } else {
    +      s"final boolean $conditionPassed = true;"
    +    }
    +
    +    if (broadcastRelation.value.isInstanceOf[UniqueHashedRelation]) {
    +      s"""
    +         |// generate join key
    +         |${keyEv.code}
    +         |// find matches from HashedRelation
    +         |UnsafeRow $matched = $anyNull ? null: (UnsafeRow)$relationTerm.getValue(${keyEv.value});
    +         |${buildVars.map(_.code).mkString("\n")}
    +         |${checkCondition.trim}
    +         |if (!$conditionPassed) {
    +         |  // reset to null
    +         |  ${buildVars.map(v => s"${v.isNull} = true;").mkString("\n")}
    +         |}
    +         |$numOutput.add(1);
    +         |${consume(ctx, resultVars)}
    +       """.stripMargin
     
         } else {
           val matches = ctx.freshName("matches")
           val bufferType = classOf[CompactBuffer[UnsafeRow]].getName
           val i = ctx.freshName("i")
           val size = ctx.freshName("size")
    +      val found = ctx.freshName("found")
           s"""
    -         | // generate join key
    -         | ${keyVal.code}
    -         | // find matches from HashRelation
    -         | $bufferType $matches = ${anyNull} ? null :
    -         |  ($bufferType) $relationTerm.get(${keyVal.value});
    -         | if ($matches != null) {
    -         |   int $size = $matches.size();
    -         |   for (int $i = 0; $i < $size; $i++) {
    -         |     UnsafeRow $matched = (UnsafeRow) $matches.apply($i);
    -         |     ${buildColumns.map(_.code).mkString("\n")}
    -         |     $outputCode
    -         |   }
    -         | }
    -     """.stripMargin
    +         |// generate join key
    +         |${keyEv.code}
    +         |// find matches from HashRelation
    +         |$bufferType $matches = $anyNull ? null : ($bufferType)$relationTerm.get(${keyEv.value});
    +         |int $size = $matches != null ? $matches.size() : 0;
    +         |boolean $found = false;
    +         |for (int $i = 0; $i <= $size; $i++) {
    --- End diff --
    
    this is clever, but i think you need to document it (i.e. you are adding an extra iteration at the end of the loop to handle null)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-184495757
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-182025797
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11130#discussion_r52838552
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---
    @@ -105,75 +107,129 @@ case class BroadcastHashJoin(
         val broadcastRelation = Await.result(broadcastFuture, timeout)
     
         streamedPlan.execute().mapPartitions { streamedIter =>
    -      val hashedRelation = broadcastRelation.value
    -      TaskContext.get().taskMetrics().incPeakExecutionMemory(hashedRelation.getMemorySize)
    -      hashJoin(streamedIter, hashedRelation, numOutputRows)
    +      val joinedRow = new JoinedRow()
    +      val hashTable = broadcastRelation.value
    +      TaskContext.get().taskMetrics().incPeakExecutionMemory(hashTable.getMemorySize)
    +      val keyGenerator = streamSideKeyGenerator
    +      val resultProj = createResultProjection
    +
    +      joinType match {
    +        case Inner =>
    +          hashJoin(streamedIter, hashTable, numOutputRows)
    +
    +        case LeftOuter =>
    +          streamedIter.flatMap(currentRow => {
    +            val rowKey = keyGenerator(currentRow)
    +            joinedRow.withLeft(currentRow)
    +            leftOuterIterator(rowKey, joinedRow, hashTable.get(rowKey), resultProj, numOutputRows)
    +          })
    +
    +        case RightOuter =>
    +          streamedIter.flatMap(currentRow => {
    +            val rowKey = keyGenerator(currentRow)
    +            joinedRow.withRight(currentRow)
    +            rightOuterIterator(rowKey, hashTable.get(rowKey), joinedRow, resultProj, numOutputRows)
    +          })
    +
    +        case x =>
    +          throw new IllegalArgumentException(
    +            s"BroadcastHashJoin should not take $x as the JoinType")
    +      }
         }
       }
     
    -  private var broadcastRelation: Broadcast[HashedRelation] = _
    -  // the term for hash relation
    -  private var relationTerm: String = _
    -
       override def upstream(): RDD[InternalRow] = {
         streamedPlan.asInstanceOf[CodegenSupport].upstream()
       }
     
       override def doProduce(ctx: CodegenContext): String = {
    +    streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this)
    +  }
    +
    +  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
    +    if (joinType == Inner) {
    +      codegenInner(ctx, input)
    +    } else {
    +      // LeftOuter and RightOuter
    +      codegenOuter(ctx, input)
    +    }
    +  }
    +
    +  private def prepareBroadcast(ctx: CodegenContext): (Broadcast[HashedRelation], String) = {
         // create a name for HashedRelation
    -    broadcastRelation = Await.result(broadcastFuture, timeout)
    +    val broadcastRelation = Await.result(broadcastFuture, timeout)
         val broadcast = ctx.addReferenceObj("broadcast", broadcastRelation)
    -    relationTerm = ctx.freshName("relation")
    +    val relationTerm = ctx.freshName("relation")
         val clsName = broadcastRelation.value.getClass.getName
         ctx.addMutableState(clsName, relationTerm,
           s"""
              | $relationTerm = ($clsName) $broadcast.value();
              | incPeakExecutionMemory($relationTerm.getMemorySize());
            """.stripMargin)
    -
    -    s"""
    -       | ${streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this)}
    -     """.stripMargin
    +    (broadcastRelation, relationTerm)
       }
     
    -  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
    -    // generate the key as UnsafeRow or Long
    +  private def genJoinKey(ctx: CodegenContext, input: Seq[ExprCode]): (ExprCode, String) = {
         ctx.currentVars = input
    -    val (keyVal, anyNull) = if (canJoinKeyFitWithinLong) {
    +    if (canJoinKeyFitWithinLong) {
    +      // generate the join key as Long
           val expr = rewriteKeyExpr(streamedKeys).head
           val ev = BindReferences.bindReference(expr, streamedPlan.output).gen(ctx)
           (ev, ev.isNull)
         } else {
    +      // generate the join key as UnsafeRow
           val keyExpr = streamedKeys.map(BindReferences.bindReference(_, streamedPlan.output))
           val ev = GenerateUnsafeProjection.createCode(ctx, keyExpr)
           (ev, s"${ev.value}.anyNull()")
         }
    +  }
     
    -    // find the matches from HashedRelation
    -    val matched = ctx.freshName("matched")
    -
    -    // create variables for output
    +  private def genBuildSideVars(ctx: CodegenContext, matched: String): Seq[ExprCode] = {
         ctx.currentVars = null
         ctx.INPUT_ROW = matched
    -    val buildColumns = buildPlan.output.zipWithIndex.map { case (a, i) =>
    -      BoundReference(i, a.dataType, a.nullable).gen(ctx)
    +    buildPlan.output.zipWithIndex.map { case (a, i) =>
    +      val ev = BoundReference(i, a.dataType, a.nullable).gen(ctx)
    +      if (joinType == Inner) {
    +        ev
    +      } else {
    +        // the variables are needed even there is no matched rows
    --- End diff --
    
    ok never mind i think i get it now.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11130#discussion_r52399113
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala ---
    @@ -165,4 +183,73 @@ trait HashJoin {
           }
         }
       }
    +
    +  @transient protected[this] lazy val EMPTY_LIST = CompactBuffer[InternalRow]()
    +
    +  @transient private[this] lazy val leftNullRow = new GenericInternalRow(left.output.length)
    +  @transient private[this] lazy val rightNullRow = new GenericInternalRow(right.output.length)
    +
    +  protected[this] def leftOuterIterator(
    +    key: InternalRow,
    +    joinedRow: JoinedRow,
    +    rightIter: Iterable[InternalRow],
    +    resultProjection: InternalRow => InternalRow,
    +    numOutputRows: LongSQLMetric): Iterator[InternalRow] = {
    +    val ret: Iterable[InternalRow] = {
    +      if (!key.anyNull) {
    --- End diff --
    
    indenting


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11130#discussion_r52400364
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---
    @@ -163,64 +192,142 @@ case class BroadcastHashJoin(
     
         // find the matches from HashedRelation
         val matched = ctx.freshName("matched")
    +    val valid = ctx.freshName("invalid")
     
         // create variables for output
         ctx.currentVars = null
         ctx.INPUT_ROW = matched
         val buildColumns = buildPlan.output.zipWithIndex.map { case (a, i) =>
    -      BoundReference(i, a.dataType, a.nullable).gen(ctx)
    +      val ev = BoundReference(i, a.dataType, a.nullable).gen(ctx)
    +      if (joinType == Inner) {
    +        ev
    +      } else {
    +        val isNull = ctx.freshName("isNull")
    +        val value = ctx.freshName("value")
    +        val code = s"""
    +          |boolean $isNull = true;
    +          |${ctx.javaType(a.dataType)} $value = ${ctx.defaultValue(a.dataType)};
    +          |if ($matched != null) {
    +          |  ${ev.code}
    +          |  $isNull = ${ev.isNull};
    +          |  $value = ${ev.value};
    +          |}
    +         """.stripMargin
    +        ExprCode(code, isNull, value)
    +      }
         }
    +
    +    // output variables
         val resultVars = buildSide match {
           case BuildLeft => buildColumns ++ input
           case BuildRight => input ++ buildColumns
         }
     
    -    val outputCode = if (condition.isDefined) {
    -      // filter the output via condition
    -      ctx.currentVars = resultVars
    -      val ev = BindReferences.bindReference(condition.get, this.output).gen(ctx)
    -      s"""
    -         | ${ev.code}
    -         | if (!${ev.isNull} && ${ev.value}) {
    -         |   ${consume(ctx, resultVars)}
    -         | }
    -       """.stripMargin
    -    } else {
    -      consume(ctx, resultVars)
    -    }
    +    if (joinType == Inner) {
    +      val outputCode = if (condition.isDefined) {
    +        // filter the output via condition
    +        ctx.currentVars = resultVars
    +        val ev = BindReferences.bindReference(condition.get, this.output).gen(ctx)
    +        s"""
    +         |${ev.code}
    +         |if (!${ev.isNull} && ${ev.value}) {
    +         |  ${consume(ctx, resultVars)}
    +         |}
    +         """.stripMargin
    +      } else {
    +        consume(ctx, resultVars)
    +      }
     
    -    if (broadcastRelation.value.isInstanceOf[UniqueHashedRelation]) {
    -      s"""
    -         | // generate join key
    -         | ${keyVal.code}
    -         | // find matches from HashedRelation
    -         | UnsafeRow $matched = $anyNull ? null: (UnsafeRow)$relationTerm.getValue(${keyVal.value});
    -         | if ($matched != null) {
    -         |   ${buildColumns.map(_.code).mkString("\n")}
    -         |   $outputCode
    -         | }
    -     """.stripMargin
    +      if (broadcastRelation.value.isInstanceOf[UniqueHashedRelation]) {
    +        s"""
    +         |// generate join key
    +         |${keyVal.code}
    +         |// find matches from HashedRelation
    +         |UnsafeRow $matched = $anyNull ? null: (UnsafeRow)$relationTerm.getValue(${keyVal.value});
    +         |if ($matched != null) {
    +         |  ${buildColumns.map(_.code).mkString("\n")}
    +         |  $outputCode
    +         |}
    +         """.stripMargin
    +
    +      } else {
    +        val matches = ctx.freshName("matches")
    +        val bufferType = classOf[CompactBuffer[UnsafeRow]].getName
    +        val i = ctx.freshName("i")
    +        val size = ctx.freshName("size")
    +        s"""
    +         |// generate join key
    +         |${keyVal.code}
    +         |// find matches from HashRelation
    +         |$bufferType $matches = $anyNull ? null : ($bufferType) $relationTerm.get(${keyVal.value});
    +         |if ($matches != null) {
    +         |  int $size = $matches.size();
    +         |  for (int $i = 0; $i < $size; $i++) {
    +         |    UnsafeRow $matched = (UnsafeRow) $matches.apply($i);
    +         |    ${buildColumns.map(_.code).mkString("\n")}
    +         |    $outputCode
    +         |  }
    +         |}
    +         """.stripMargin
    +      }
     
         } else {
    -      val matches = ctx.freshName("matches")
    -      val bufferType = classOf[CompactBuffer[UnsafeRow]].getName
    -      val i = ctx.freshName("i")
    -      val size = ctx.freshName("size")
    -      s"""
    -         | // generate join key
    -         | ${keyVal.code}
    -         | // find matches from HashRelation
    -         | $bufferType $matches = ${anyNull} ? null :
    -         |  ($bufferType) $relationTerm.get(${keyVal.value});
    -         | if ($matches != null) {
    -         |   int $size = $matches.size();
    -         |   for (int $i = 0; $i < $size; $i++) {
    -         |     UnsafeRow $matched = (UnsafeRow) $matches.apply($i);
    -         |     ${buildColumns.map(_.code).mkString("\n")}
    -         |     $outputCode
    -         |   }
    -         | }
    -     """.stripMargin
    +      // LeftOuter and RightOuter
    +
    +      // filter the output via condition
    +      val checkCondition = if (condition.isDefined) {
    +        ctx.currentVars = resultVars
    +        val ev = BindReferences.bindReference(condition.get, this.output).gen(ctx)
    +        s"""
    +         |boolean $valid = true;
    +         |if ($matched != null) {
    +         |  ${ev.code}
    +         |  $valid = !${ev.isNull} && ${ev.value};
    +         |}
    +         """.stripMargin
    +      } else {
    +        s"final boolean $valid = true;"
    +      }
    +
    +      if (broadcastRelation.value.isInstanceOf[UniqueHashedRelation]) {
    +        s"""
    +         |// generate join key
    +         |${keyVal.code}
    +         |// find matches from HashedRelation
    +         |UnsafeRow $matched = $anyNull ? null: (UnsafeRow)$relationTerm.getValue(${keyVal.value});
    +         |${buildColumns.map(_.code).mkString("\n")}
    +         |${checkCondition.trim}
    +         |if (!$valid) {
    +         |  // reset to null
    +         |  ${buildColumns.map(v => s"${v.isNull} = true;").mkString("\n")}
    --- End diff --
    
    This is left outer join, so the build part should be null. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-181752866
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/11130


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-184495643
  
    **[Test build #51333 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/51333/consoleFull)** for PR 11130 at commit [`5744941`](https://github.com/apache/spark/commit/5744941063ba05b07e4a7265277162c331a9c48c).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11130#discussion_r52399292
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---
    @@ -163,64 +192,142 @@ case class BroadcastHashJoin(
     
         // find the matches from HashedRelation
         val matched = ctx.freshName("matched")
    +    val valid = ctx.freshName("invalid")
     
         // create variables for output
         ctx.currentVars = null
         ctx.INPUT_ROW = matched
         val buildColumns = buildPlan.output.zipWithIndex.map { case (a, i) =>
    -      BoundReference(i, a.dataType, a.nullable).gen(ctx)
    +      val ev = BoundReference(i, a.dataType, a.nullable).gen(ctx)
    +      if (joinType == Inner) {
    +        ev
    +      } else {
    +        val isNull = ctx.freshName("isNull")
    +        val value = ctx.freshName("value")
    +        val code = s"""
    +          |boolean $isNull = true;
    +          |${ctx.javaType(a.dataType)} $value = ${ctx.defaultValue(a.dataType)};
    +          |if ($matched != null) {
    +          |  ${ev.code}
    +          |  $isNull = ${ev.isNull};
    +          |  $value = ${ev.value};
    +          |}
    +         """.stripMargin
    +        ExprCode(code, isNull, value)
    +      }
         }
    +
    +    // output variables
         val resultVars = buildSide match {
           case BuildLeft => buildColumns ++ input
           case BuildRight => input ++ buildColumns
         }
     
    -    val outputCode = if (condition.isDefined) {
    -      // filter the output via condition
    -      ctx.currentVars = resultVars
    -      val ev = BindReferences.bindReference(condition.get, this.output).gen(ctx)
    -      s"""
    -         | ${ev.code}
    -         | if (!${ev.isNull} && ${ev.value}) {
    -         |   ${consume(ctx, resultVars)}
    -         | }
    -       """.stripMargin
    -    } else {
    -      consume(ctx, resultVars)
    -    }
    +    if (joinType == Inner) {
    --- End diff --
    
    can you break this function up to codegenInner(), codegenOuter()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-184495758
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/51333/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-182039001
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11130#discussion_r52838805
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---
    @@ -184,36 +240,106 @@ case class BroadcastHashJoin(
     
         if (broadcastRelation.value.isInstanceOf[UniqueHashedRelation]) {
           s"""
    -         | // generate join key
    -         | ${keyVal.code}
    -         | // find matches from HashedRelation
    -         | UnsafeRow $matched = $anyNull ? null: (UnsafeRow)$relationTerm.getValue(${keyVal.value});
    -         | if ($matched != null) {
    -         |   ${buildColumns.map(_.code).mkString("\n")}
    -         |   $outputCode
    -         | }
    -     """.stripMargin
    +         |// generate join key
    +         |${keyEv.code}
    +         |// find matches from HashedRelation
    +         |UnsafeRow $matched = $anyNull ? null: (UnsafeRow)$relationTerm.getValue(${keyEv.value});
    +         |if ($matched != null) {
    +         |  ${buildVars.map(_.code).mkString("\n")}
    +         |  $outputCode
    +         |}
    +       """.stripMargin
    +
    +    } else {
    +      val matches = ctx.freshName("matches")
    +      val bufferType = classOf[CompactBuffer[UnsafeRow]].getName
    +      val i = ctx.freshName("i")
    +      val size = ctx.freshName("size")
    +      s"""
    +         |// generate join key
    +         |${keyEv.code}
    +         |// find matches from HashRelation
    +         |$bufferType $matches = $anyNull ? null : ($bufferType)$relationTerm.get(${keyEv.value});
    +         |if ($matches != null) {
    +         |  int $size = $matches.size();
    +         |  for (int $i = 0; $i < $size; $i++) {
    +         |    UnsafeRow $matched = (UnsafeRow) $matches.apply($i);
    +         |    ${buildVars.map(_.code).mkString("\n")}
    +         |    $outputCode
    +         |  }
    +         |}
    +       """.stripMargin
    +    }
    +  }
    +
    +
    +  private def codegenOuter(ctx: CodegenContext, input: Seq[ExprCode]): String = {
    --- End diff --
    
    I already tried hard to reduce the duplicated codes, it will be harder to understand if we have more small fragments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-181752684
  
    **[Test build #50961 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50961/consoleFull)** for PR 11130 at commit [`52efe91`](https://github.com/apache/spark/commit/52efe91168a4be7ce721d2f56e2b1e7aab9379db).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-183472115
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-184468206
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-184467627
  
    **[Test build #51329 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/51329/consoleFull)** for PR 11130 at commit [`5724180`](https://github.com/apache/spark/commit/57241806ae31130429cb68a58a4086f15c3965f4).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11130#discussion_r52838542
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---
    @@ -184,36 +240,106 @@ case class BroadcastHashJoin(
     
         if (broadcastRelation.value.isInstanceOf[UniqueHashedRelation]) {
           s"""
    -         | // generate join key
    -         | ${keyVal.code}
    -         | // find matches from HashedRelation
    -         | UnsafeRow $matched = $anyNull ? null: (UnsafeRow)$relationTerm.getValue(${keyVal.value});
    -         | if ($matched != null) {
    -         |   ${buildColumns.map(_.code).mkString("\n")}
    -         |   $outputCode
    -         | }
    -     """.stripMargin
    +         |// generate join key
    --- End diff --
    
    generate join key -> generate join key for stream side


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-184468211
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/51329/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-181988539
  
    For query:
    ```
    val dim = broadcast(sqlContext.range(1 << 16).selectExpr("id as k", "cast(id as string) as v"))
    sqlContext.range(N).join(dim, (col("id") % 60000) === col("k"), "left").count()
    ```
    
    will generate:
    
    ```
    /* 001 */
    /* 002 */ public Object generate(Object[] references) {
    /* 003 */   return new GeneratedIterator(references);
    /* 004 */ }
    /* 005 */
    /* 006 */ class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
    /* 007 */
    /* 008 */   private Object[] references;
    /* 009 */   private boolean agg_initAgg;
    /* 010 */   private boolean agg_bufIsNull;
    /* 011 */   private long agg_bufValue;
    /* 012 */   private org.apache.spark.broadcast.TorrentBroadcast join_broadcast;
    /* 013 */   private org.apache.spark.sql.execution.joins.LongArrayRelation join_relation;
    /* 014 */   private boolean range_initRange;
    /* 015 */   private long range_partitionEnd;
    /* 016 */   private long range_number;
    /* 017 */   private boolean range_overflow;
    /* 018 */   private UnsafeRow agg_result;
    /* 019 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
    /* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
    /* 021 */
    /* 022 */   public GeneratedIterator(Object[] references) {
    /* 023 */     this.references = references;
    /* 024 */     agg_initAgg = false;
    /* 025 */
    /* 026 */
    /* 027 */     this.join_broadcast = (org.apache.spark.broadcast.TorrentBroadcast) references[0];
    /* 028 */
    /* 029 */     join_relation = (org.apache.spark.sql.execution.joins.LongArrayRelation) join_broadcast.value();
    /* 030 */     incPeakExecutionMemory(join_relation.getMemorySize());
    /* 031 */
    /* 032 */     range_initRange = false;
    /* 033 */     range_partitionEnd = 0L;
    /* 034 */     range_number = 0L;
    /* 035 */     range_overflow = false;
    /* 036 */     agg_result = new UnsafeRow(1);
    /* 037 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
    /* 038 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
    /* 039 */   }
    /* 040 */
    /* 041 */
    /* 042 */   private void agg_doAggregateWithoutKey() throws java.io.IOException {
    /* 043 */     // initialize aggregation buffer
    /* 044 */
    /* 045 */     agg_bufIsNull = false;
    /* 046 */     agg_bufValue = 0L;
    /* 047 */
    /* 048 */
    /* 049 */
    /* 050 */
    /* 051 */     // initialize Range
    /* 052 */     if (!range_initRange) {
    /* 053 */       range_initRange = true;
    /* 054 */       if (input.hasNext()) {
    /* 055 */         initRange(((InternalRow) input.next()).getInt(0));
    /* 056 */       } else {
    /* 057 */         return;
    /* 058 */       }
    /* 059 */     }
    /* 060 */
    /* 061 */     while (!range_overflow && range_number < range_partitionEnd) {
    /* 062 */       long range_value = range_number;
    /* 063 */       range_number += 1L;
    /* 064 */       if (range_number < range_value ^ 1L < 0) {
    /* 065 */         range_overflow = true;
    /* 066 */       }
    /* 067 */
    /* 068 */       // generate join key
    /* 069 */       /* (input[0, bigint] % 60000) */
    /* 070 */       boolean join_isNull = false;
    /* 071 */       long join_value = -1L;
    /* 072 */       if (false || 60000L == 0) {
    /* 073 */         join_isNull = true;
    /* 074 */       } else {
    /* 075 */
    /* 076 */         if (false) {
    /* 077 */           join_isNull = true;
    /* 078 */         } else {
    /* 079 */           join_value = (long)(range_value % 60000L);
    /* 080 */         }
    /* 081 */       }
    /* 082 */       // find matches from HashedRelation
    /* 083 */       UnsafeRow join_matched = join_isNull ? null: (UnsafeRow)join_relation.getValue(join_value);
    /* 084 */
    /* 085 */       boolean join_isNull4 = true;
    /* 086 */       long join_value4 = -1L;
    /* 087 */       if (join_matched != null) {
    /* 088 */         /* input[0, bigint] */
    /* 089 */         long join_value3 = join_matched.getLong(0);
    /* 090 */         join_isNull4 = false;
    /* 091 */         join_value4 = join_value3;
    /* 092 */       }
    /* 093 */
    /* 094 */       final boolean join_invalid = true;
    /* 095 */       if (!join_invalid) {
    /* 096 */         // reset to null
    /* 097 */         join_isNull4 = true;
    /* 098 */       }
    /* 099 */
    /* 100 */
    /* 101 */
    /* 102 */
    /* 103 */       // do aggregate
    /* 104 */       /* (input[0, bigint] + 1) */
    /* 105 */       long agg_value1 = -1L;
    /* 106 */       agg_value1 = agg_bufValue + 1L;
    /* 107 */       // update aggregation buffer
    /* 108 */       agg_bufIsNull = false;
    /* 109 */       agg_bufValue = agg_value1;
    /* 110 */
    /* 111 */
    /* 112 */
    /* 113 */
    /* 114 */       if (shouldStop()) return;
    /* 115 */     }
    /* 116 */
    /* 117 */
    /* 118 */   }
    /* 119 */
    /* 120 */
    /* 121 */   private void initRange(int idx) {
    /* 122 */     java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
    /* 123 */     java.math.BigInteger numSlice = java.math.BigInteger.valueOf(1L);
    /* 124 */     java.math.BigInteger numElement = java.math.BigInteger.valueOf(102400L);
    /* 125 */     java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
    /* 126 */     java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
    /* 127 */
    /* 128 */     java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
    /* 129 */     if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
    /* 130 */       range_number = Long.MAX_VALUE;
    /* 131 */     } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
    /* 132 */       range_number = Long.MIN_VALUE;
    /* 133 */     } else {
    /* 134 */       range_number = st.longValue();
    /* 135 */     }
    /* 136 */
    /* 137 */     java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
    /* 138 */     .multiply(step).add(start);
    /* 139 */     if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
    /* 140 */       range_partitionEnd = Long.MAX_VALUE;
    /* 141 */     } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
    /* 142 */       range_partitionEnd = Long.MIN_VALUE;
    /* 143 */     } else {
    /* 144 */       range_partitionEnd = end.longValue();
    /* 145 */     }
    /* 146 */   }
    /* 147 */
    /* 148 */
    /* 149 */   protected void processNext() throws java.io.IOException {
    /* 150 */     if (!agg_initAgg) {
    /* 151 */       agg_initAgg = true;
    /* 152 */       agg_doAggregateWithoutKey();
    /* 153 */
    /* 154 */       // output the result
    /* 155 */
    /* 156 */
    /* 157 */       agg_rowWriter.zeroOutNullBytes();
    /* 158 */
    /* 159 */
    /* 160 */       if (agg_bufIsNull) {
    /* 161 */         agg_rowWriter.setNullAt(0);
    /* 162 */       } else {
    /* 163 */         agg_rowWriter.write(0, agg_bufValue);
    /* 164 */       }
    /* 165 */       currentRows.add(agg_result.copy());
    /* 166 */     }
    /* 167 */   }
    /* 168 */ }
    /* 169 */
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11130#discussion_r52838817
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---
    @@ -105,75 +107,129 @@ case class BroadcastHashJoin(
         val broadcastRelation = Await.result(broadcastFuture, timeout)
     
         streamedPlan.execute().mapPartitions { streamedIter =>
    -      val hashedRelation = broadcastRelation.value
    -      TaskContext.get().taskMetrics().incPeakExecutionMemory(hashedRelation.getMemorySize)
    -      hashJoin(streamedIter, hashedRelation, numOutputRows)
    +      val joinedRow = new JoinedRow()
    +      val hashTable = broadcastRelation.value
    +      TaskContext.get().taskMetrics().incPeakExecutionMemory(hashTable.getMemorySize)
    +      val keyGenerator = streamSideKeyGenerator
    +      val resultProj = createResultProjection
    +
    +      joinType match {
    +        case Inner =>
    +          hashJoin(streamedIter, hashTable, numOutputRows)
    +
    +        case LeftOuter =>
    +          streamedIter.flatMap(currentRow => {
    +            val rowKey = keyGenerator(currentRow)
    +            joinedRow.withLeft(currentRow)
    +            leftOuterIterator(rowKey, joinedRow, hashTable.get(rowKey), resultProj, numOutputRows)
    +          })
    +
    +        case RightOuter =>
    +          streamedIter.flatMap(currentRow => {
    +            val rowKey = keyGenerator(currentRow)
    +            joinedRow.withRight(currentRow)
    +            rightOuterIterator(rowKey, hashTable.get(rowKey), joinedRow, resultProj, numOutputRows)
    +          })
    +
    +        case x =>
    +          throw new IllegalArgumentException(
    +            s"BroadcastHashJoin should not take $x as the JoinType")
    +      }
         }
       }
     
    -  private var broadcastRelation: Broadcast[HashedRelation] = _
    -  // the term for hash relation
    -  private var relationTerm: String = _
    -
       override def upstream(): RDD[InternalRow] = {
         streamedPlan.asInstanceOf[CodegenSupport].upstream()
       }
     
       override def doProduce(ctx: CodegenContext): String = {
    +    streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this)
    +  }
    +
    +  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
    +    if (joinType == Inner) {
    +      codegenInner(ctx, input)
    +    } else {
    +      // LeftOuter and RightOuter
    +      codegenOuter(ctx, input)
    +    }
    +  }
    +
    +  private def prepareBroadcast(ctx: CodegenContext): (Broadcast[HashedRelation], String) = {
         // create a name for HashedRelation
    -    broadcastRelation = Await.result(broadcastFuture, timeout)
    +    val broadcastRelation = Await.result(broadcastFuture, timeout)
         val broadcast = ctx.addReferenceObj("broadcast", broadcastRelation)
    -    relationTerm = ctx.freshName("relation")
    +    val relationTerm = ctx.freshName("relation")
         val clsName = broadcastRelation.value.getClass.getName
         ctx.addMutableState(clsName, relationTerm,
           s"""
              | $relationTerm = ($clsName) $broadcast.value();
              | incPeakExecutionMemory($relationTerm.getMemorySize());
            """.stripMargin)
    -
    -    s"""
    -       | ${streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this)}
    -     """.stripMargin
    +    (broadcastRelation, relationTerm)
       }
     
    -  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
    -    // generate the key as UnsafeRow or Long
    +  private def genJoinKey(ctx: CodegenContext, input: Seq[ExprCode]): (ExprCode, String) = {
    --- End diff --
    
    also rename this genStreamSideJoinKey, so it matches genBuildSideVars


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11130#discussion_r52838770
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---
    @@ -184,36 +240,106 @@ case class BroadcastHashJoin(
     
         if (broadcastRelation.value.isInstanceOf[UniqueHashedRelation]) {
           s"""
    -         | // generate join key
    -         | ${keyVal.code}
    -         | // find matches from HashedRelation
    -         | UnsafeRow $matched = $anyNull ? null: (UnsafeRow)$relationTerm.getValue(${keyVal.value});
    -         | if ($matched != null) {
    -         |   ${buildColumns.map(_.code).mkString("\n")}
    -         |   $outputCode
    -         | }
    -     """.stripMargin
    +         |// generate join key
    +         |${keyEv.code}
    +         |// find matches from HashedRelation
    +         |UnsafeRow $matched = $anyNull ? null: (UnsafeRow)$relationTerm.getValue(${keyEv.value});
    +         |if ($matched != null) {
    +         |  ${buildVars.map(_.code).mkString("\n")}
    +         |  $outputCode
    +         |}
    +       """.stripMargin
    +
    +    } else {
    +      val matches = ctx.freshName("matches")
    +      val bufferType = classOf[CompactBuffer[UnsafeRow]].getName
    +      val i = ctx.freshName("i")
    +      val size = ctx.freshName("size")
    +      s"""
    +         |// generate join key
    +         |${keyEv.code}
    +         |// find matches from HashRelation
    +         |$bufferType $matches = $anyNull ? null : ($bufferType)$relationTerm.get(${keyEv.value});
    +         |if ($matches != null) {
    +         |  int $size = $matches.size();
    +         |  for (int $i = 0; $i < $size; $i++) {
    +         |    UnsafeRow $matched = (UnsafeRow) $matches.apply($i);
    +         |    ${buildVars.map(_.code).mkString("\n")}
    +         |    $outputCode
    +         |  }
    +         |}
    +       """.stripMargin
    +    }
    +  }
    +
    +
    +  private def codegenOuter(ctx: CodegenContext, input: Seq[ExprCode]): String = {
    --- End diff --
    
    there are lots of code duplication from codegenInner. Can we merge them?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11130#discussion_r52398813
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---
    @@ -163,64 +192,142 @@ case class BroadcastHashJoin(
     
         // find the matches from HashedRelation
         val matched = ctx.freshName("matched")
    +    val valid = ctx.freshName("invalid")
    --- End diff --
    
    this is confusing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11130#issuecomment-182557869
  
    **[Test build #2531 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2531/consoleFull)** for PR 11130 at commit [`da45df1`](https://github.com/apache/spark/commit/da45df1536f112a14bfe15d6d30d307cdbd99d5b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13237] [SQL] generated broadcast outer ...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11130#discussion_r52838778
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---
    @@ -105,75 +107,129 @@ case class BroadcastHashJoin(
         val broadcastRelation = Await.result(broadcastFuture, timeout)
     
         streamedPlan.execute().mapPartitions { streamedIter =>
    -      val hashedRelation = broadcastRelation.value
    -      TaskContext.get().taskMetrics().incPeakExecutionMemory(hashedRelation.getMemorySize)
    -      hashJoin(streamedIter, hashedRelation, numOutputRows)
    +      val joinedRow = new JoinedRow()
    +      val hashTable = broadcastRelation.value
    +      TaskContext.get().taskMetrics().incPeakExecutionMemory(hashTable.getMemorySize)
    +      val keyGenerator = streamSideKeyGenerator
    +      val resultProj = createResultProjection
    +
    +      joinType match {
    +        case Inner =>
    +          hashJoin(streamedIter, hashTable, numOutputRows)
    +
    +        case LeftOuter =>
    +          streamedIter.flatMap(currentRow => {
    --- End diff --
    
    Sure, will do it when update this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org