You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by davies <gi...@git.apache.org> on 2016/01/29 22:04:04 UTC

[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

GitHub user davies opened a pull request:

    https://github.com/apache/spark/pull/10989

    [SPARK-12798] [SQL] generated BroadcastHashJoin [WIP]

    This is based on #10855 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/davies/spark gen_join

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/10989.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #10989
    
----
commit 3e792f3569d7a397e2817ac3b66816a3c35feed0
Author: Davies Liu <da...@databricks.com>
Date:   2016-01-21T00:23:04Z

    generate aggregation with grouping keys

commit 2f1a0821f6f6dc7b968f04fdf2ddfc93ec6b6677
Author: Davies Liu <da...@databricks.com>
Date:   2016-01-21T00:58:40Z

    support Final aggregate

commit 7d1bd43aafd7c38120b9508830e7a22db11371b4
Author: Davies Liu <da...@databricks.com>
Date:   2016-01-21T02:57:22Z

    fix tests

commit 788078668795458aa29a55d18e2b23686992df8d
Author: Davies Liu <da...@databricks.com>
Date:   2016-01-21T06:05:57Z

    cleanup

commit 407460d15a7dc692b852c035a5473bd335f2c87b
Author: Davies Liu <da...@databricks.com>
Date:   2016-01-21T08:19:26Z

    generated BroadcastHashJoin

commit 9a42b522cb483a3502ee36cea6672c36f2e40b46
Author: Davies Liu <da...@databricks.com>
Date:   2016-01-26T02:15:59Z

    address comments

commit 37bc7f0a34c6bb7d941cfa146af471d1f83ab04a
Author: Davies Liu <da...@databricks.com>
Date:   2016-01-27T06:57:10Z

    fix tests

commit 48e125cd7623af1af2b9f82b9ca8ddeca438ad17
Author: Davies Liu <da...@databricks.com>
Date:   2016-01-27T06:58:43Z

    Merge branch 'master' of github.com:apache/spark into gen_keys
    
    Conflicts:
    	sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java

commit efe7fa26d0ed49c09ec886185713f235c645570f
Author: Davies Liu <da...@databricks.com>
Date:   2016-01-28T22:47:05Z

    Merge branch 'master' of github.com:apache/spark into gen_keys
    
    Conflicts:
    	sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
    	sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
    	sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala

commit 3bfdeb2cacebe8567f2d0123c853a20de84fa158
Author: Davies Liu <da...@databricks.com>
Date:   2016-01-28T23:32:34Z

    adress comment

commit 858c1e3b0ce20dad0ee23d2444267f5a640fac2a
Author: Davies Liu <da...@databricks.com>
Date:   2016-01-29T18:09:59Z

    Merge branch 'master' of github.com:apache/spark into gen_keys
    
    Conflicts:
    	sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala

commit be2e53bd3be70604b885a7a219a1a28801d75320
Author: Davies Liu <da...@databricks.com>
Date:   2016-01-29T19:20:01Z

    minor

commit f234c21655f8ceb2f0567a170c38e1004b6cd3d9
Author: Davies Liu <da...@databricks.com>
Date:   2016-01-29T19:38:53Z

    Merge branch 'gen_keys' into gen_join
    
    Conflicts:
    	sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
    	sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10989#discussion_r51356463
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java ---
    @@ -54,13 +54,27 @@ public void setInput(Iterator<InternalRow> iter) {
       }
     
       /**
    +   * Returns whether it should stop processing next row or not.
    --- End diff --
    
    what's "it"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-177090990
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-178800319
  
    **[Test build #50587 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50587/consoleFull)** for PR 10989 at commit [`4d75022`](https://github.com/apache/spark/commit/4d7502206d17ae3f148a620bed15eff38b35018e).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10989#discussion_r51630930
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala ---
    @@ -389,14 +379,16 @@ case class TungstenAggregate(
            UnsafeRow $keyTerm = (UnsafeRow) $iterTerm.getKey();
            UnsafeRow $bufferTerm = (UnsafeRow) $iterTerm.getValue();
            $outputCode
    +
    +       if (shouldStop()) return;
          }
     
    -     $thisPlan.updatePeakMemory($hashMapTerm);
    +     incPeakExecutionMemory($hashMapTerm.getPeakMemoryUsedBytes());
    --- End diff --
    
    can we bake the peak memory usage into hashMapTerm.free()? This seems like something we'll forget to do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-179062775
  
    **[Test build #50650 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50650/consoleFull)** for PR 10989 at commit [`e0c8c65`](https://github.com/apache/spark/commit/e0c8c652b86ce9d17bcb5d629e6b55563b5c382b).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10989#discussion_r51683070
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala ---
    @@ -389,14 +379,16 @@ case class TungstenAggregate(
            UnsafeRow $keyTerm = (UnsafeRow) $iterTerm.getKey();
            UnsafeRow $bufferTerm = (UnsafeRow) $iterTerm.getValue();
            $outputCode
    +
    +       if (shouldStop()) return;
    --- End diff --
    
    Once `shouldStop()` returns true, the caller should exit the loop (via return).
    
    `map.free()` is called only when it had consumed all the items in the loop (without return).
    
    Will added these to the doc string of shouldStop().


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-179063141
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50650/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-179063137
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-176974522
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-177064691
  
    **[Test build #50433 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50433/consoleFull)** for PR 10989 at commit [`89614a5`](https://github.com/apache/spark/commit/89614a55a3a80a7a62dc7934cadf553adb34ff10).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-177090938
  
    **[Test build #50437 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50437/consoleFull)** for PR 10989 at commit [`1ecce29`](https://github.com/apache/spark/commit/1ecce29b10ee7ee27577a8955f4a7db4d933ba66).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-176974525
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50409/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-179396034
  
    Merging this into master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #10989: [SPARK-12798] [SQL] generated BroadcastHashJoin

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10989#discussion_r222216289
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---
    @@ -117,6 +120,87 @@ case class BroadcastHashJoin(
           hashJoin(streamedIter, numStreamedRows, hashedRelation, numOutputRows)
         }
       }
    +
    +  // the term for hash relation
    +  private var relationTerm: String = _
    +
    +  override def upstream(): RDD[InternalRow] = {
    +    streamedPlan.asInstanceOf[CodegenSupport].upstream()
    +  }
    +
    +  override def doProduce(ctx: CodegenContext): String = {
    +    // create a name for HashRelation
    +    val broadcastRelation = Await.result(broadcastFuture, timeout)
    +    val broadcast = ctx.addReferenceObj("broadcast", broadcastRelation)
    +    relationTerm = ctx.freshName("relation")
    +    // TODO: create specialized HashRelation for single join key
    +    val clsName = classOf[UnsafeHashedRelation].getName
    +    ctx.addMutableState(clsName, relationTerm,
    +      s"""
    +         | $relationTerm = ($clsName) $broadcast.value();
    +         | incPeakExecutionMemory($relationTerm.getUnsafeSize());
    +       """.stripMargin)
    +
    +    s"""
    +       | ${streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this)}
    +     """.stripMargin
    +  }
    +
    +  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
    +    // generate the key as UnsafeRow
    +    ctx.currentVars = input
    +    val keyExpr = streamedKeys.map(BindReferences.bindReference(_, streamedPlan.output))
    +    val keyVal = GenerateUnsafeProjection.createCode(ctx, keyExpr)
    +    val keyTerm = keyVal.value
    +    val anyNull = if (keyExpr.exists(_.nullable)) s"$keyTerm.anyNull()" else "false"
    +
    +    // find the matches from HashedRelation
    +    val matches = ctx.freshName("matches")
    +    val bufferType = classOf[CompactBuffer[UnsafeRow]].getName
    +    val i = ctx.freshName("i")
    +    val size = ctx.freshName("size")
    +    val row = ctx.freshName("row")
    +
    +    // create variables for output
    +    ctx.currentVars = null
    +    ctx.INPUT_ROW = row
    +    val buildColumns = buildPlan.output.zipWithIndex.map { case (a, i) =>
    +      BoundReference(i, a.dataType, a.nullable).gen(ctx)
    +    }
    +    val resultVars = buildSide match {
    +      case BuildLeft => buildColumns ++ input
    +      case BuildRight => input ++ buildColumns
    +    }
    +
    +    val ouputCode = if (condition.isDefined) {
    +      // filter the output via condition
    +      ctx.currentVars = resultVars
    +      val ev = BindReferences.bindReference(condition.get, this.output).gen(ctx)
    +      s"""
    +         | ${ev.code}
    +         | if (!${ev.isNull} && ${ev.value}) {
    +         |   ${consume(ctx, resultVars)}
    +         | }
    +       """.stripMargin
    +    } else {
    +      consume(ctx, resultVars)
    +    }
    +
    +    s"""
    +       | // generate join key
    +       | ${keyVal.code}
    +       | // find matches from HashRelation
    +       | $bufferType $matches = $anyNull ? null : ($bufferType) $relationTerm.get($keyTerm);
    +       | if ($matches != null) {
    +       |   int $size = $matches.size();
    +       |   for (int $i = 0; $i < $size; $i++) {
    --- End diff --
    
    mmmh, maybe I see your point now. I think it may be feasible but a bit complex. We might keep a global variable for `$matches` and read from it in the produce method. This is what you are saying right? Just changing here wouldn't work IMHO because in the next iteration the keys are changed...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #10989: [SPARK-12798] [SQL] generated BroadcastHashJoin

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10989#discussion_r221966919
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---
    @@ -117,6 +120,87 @@ case class BroadcastHashJoin(
           hashJoin(streamedIter, numStreamedRows, hashedRelation, numOutputRows)
         }
       }
    +
    +  // the term for hash relation
    +  private var relationTerm: String = _
    +
    +  override def upstream(): RDD[InternalRow] = {
    +    streamedPlan.asInstanceOf[CodegenSupport].upstream()
    +  }
    +
    +  override def doProduce(ctx: CodegenContext): String = {
    +    // create a name for HashRelation
    +    val broadcastRelation = Await.result(broadcastFuture, timeout)
    +    val broadcast = ctx.addReferenceObj("broadcast", broadcastRelation)
    +    relationTerm = ctx.freshName("relation")
    +    // TODO: create specialized HashRelation for single join key
    +    val clsName = classOf[UnsafeHashedRelation].getName
    +    ctx.addMutableState(clsName, relationTerm,
    +      s"""
    +         | $relationTerm = ($clsName) $broadcast.value();
    +         | incPeakExecutionMemory($relationTerm.getUnsafeSize());
    +       """.stripMargin)
    +
    +    s"""
    +       | ${streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this)}
    +     """.stripMargin
    +  }
    +
    +  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
    +    // generate the key as UnsafeRow
    +    ctx.currentVars = input
    +    val keyExpr = streamedKeys.map(BindReferences.bindReference(_, streamedPlan.output))
    +    val keyVal = GenerateUnsafeProjection.createCode(ctx, keyExpr)
    +    val keyTerm = keyVal.value
    +    val anyNull = if (keyExpr.exists(_.nullable)) s"$keyTerm.anyNull()" else "false"
    +
    +    // find the matches from HashedRelation
    +    val matches = ctx.freshName("matches")
    +    val bufferType = classOf[CompactBuffer[UnsafeRow]].getName
    +    val i = ctx.freshName("i")
    +    val size = ctx.freshName("size")
    +    val row = ctx.freshName("row")
    +
    +    // create variables for output
    +    ctx.currentVars = null
    +    ctx.INPUT_ROW = row
    +    val buildColumns = buildPlan.output.zipWithIndex.map { case (a, i) =>
    +      BoundReference(i, a.dataType, a.nullable).gen(ctx)
    +    }
    +    val resultVars = buildSide match {
    +      case BuildLeft => buildColumns ++ input
    +      case BuildRight => input ++ buildColumns
    +    }
    +
    +    val ouputCode = if (condition.isDefined) {
    +      // filter the output via condition
    +      ctx.currentVars = resultVars
    +      val ev = BindReferences.bindReference(condition.get, this.output).gen(ctx)
    +      s"""
    +         | ${ev.code}
    +         | if (!${ev.isNull} && ${ev.value}) {
    +         |   ${consume(ctx, resultVars)}
    +         | }
    +       """.stripMargin
    +    } else {
    +      consume(ctx, resultVars)
    +    }
    +
    +    s"""
    +       | // generate join key
    +       | ${keyVal.code}
    +       | // find matches from HashRelation
    +       | $bufferType $matches = $anyNull ? null : ($bufferType) $relationTerm.get($keyTerm);
    +       | if ($matches != null) {
    +       |   int $size = $matches.size();
    +       |   for (int $i = 0; $i < $size; $i++) {
    --- End diff --
    
    mmmh... this code seems rather outdated...I couldn't find it in the current codebase. Anyway, I don't understand why you want to interrupt it. AFAIU, this is generating the result from all the matches of a row, hence if we interrupt it somehow we would end up returning a wrong result (in the result we would omit some rows...).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10989#discussion_r51631359
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala ---
    @@ -389,14 +379,16 @@ case class TungstenAggregate(
            UnsafeRow $keyTerm = (UnsafeRow) $iterTerm.getKey();
            UnsafeRow $bufferTerm = (UnsafeRow) $iterTerm.getValue();
            $outputCode
    +
    +       if (shouldStop()) return;
    --- End diff --
    
    Can you document the required behavior of shouldStop(). How does it need to behave so that the clean up below (hashMapTerm.free()) is called? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-178809120
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-177085380
  
    **[Test build #50435 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50435/consoleFull)** for PR 10989 at commit [`dcf4fdc`](https://github.com/apache/spark/commit/dcf4fdc0b955fb5abd9686447710ecc0a76ce990).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-177064746
  
    **[Test build #50433 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50433/consoleFull)** for PR 10989 at commit [`89614a5`](https://github.com/apache/spark/commit/89614a55a3a80a7a62dc7934cadf553adb34ff10).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10989#discussion_r51356558
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala ---
    @@ -81,6 +82,30 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
         benchmark.run()
       }
     
    +  def testBroadcastHashJoin(values: Int): Unit = {
    +    val benchmark = new Benchmark("BroadcastHashJoin", values)
    +
    +    val dim = broadcast(sqlContext.range(1 << 16).selectExpr("id as k", "cast(id as string) as v"))
    +
    +    benchmark.addCase("BroadcastHashJoin w/o codegen") { iter =>
    +      sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
    +      sqlContext.range(values).join(dim, (col("id") % 60000) === col("k")).count()
    +    }
    +    benchmark.addCase(s"BroadcastHashJoin w codegen") { iter =>
    +      sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
    +      sqlContext.range(values).join(dim, (col("id") % 60000) === col("k")).count()
    +    }
    +
    +    /*
    +      Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
    +      BroadcastHashJoin:                 Avg Time(ms)    Avg Rate(M/s)  Relative Rate
    +      -------------------------------------------------------------------------------
    +      BroadcastHashJoin w/o codegen           3053.41             3.43         1.00 X
    +      BroadcastHashJoin w codegen             1028.40            10.20         2.97 X
    --- End diff --
    
    can you also run a benchmark using a larger range so we amortize the broadcast overhead? i'm interested in seeing what the improvement is for the join part of the benchmark.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-177091015
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10989#discussion_r51356688
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala ---
    @@ -81,6 +82,30 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
         benchmark.run()
       }
     
    +  def testBroadcastHashJoin(values: Int): Unit = {
    +    val benchmark = new Benchmark("BroadcastHashJoin", values)
    +
    +    val dim = broadcast(sqlContext.range(1 << 16).selectExpr("id as k", "cast(id as string) as v"))
    +
    +    benchmark.addCase("BroadcastHashJoin w/o codegen") { iter =>
    +      sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
    +      sqlContext.range(values).join(dim, (col("id") % 60000) === col("k")).count()
    +    }
    +    benchmark.addCase(s"BroadcastHashJoin w codegen") { iter =>
    +      sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
    +      sqlContext.range(values).join(dim, (col("id") % 60000) === col("k")).count()
    +    }
    +
    +    /*
    +      Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
    +      BroadcastHashJoin:                 Avg Time(ms)    Avg Rate(M/s)  Relative Rate
    +      -------------------------------------------------------------------------------
    +      BroadcastHashJoin w/o codegen           3053.41             3.43         1.00 X
    +      BroadcastHashJoin w codegen             1028.40            10.20         2.97 X
    --- End diff --
    
    Since the dimension table is pretty small, overhead of broadcast is also low, when I ran it with larger range, the improvements did not change much, because looking up in BytesToBytes is the bottleneck. I will have another PR to improve the join with small dimension table.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-177091013
  
    **[Test build #50437 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50437/consoleFull)** for PR 10989 at commit [`1ecce29`](https://github.com/apache/spark/commit/1ecce29b10ee7ee27577a8955f4a7db4d933ba66).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #10989: [SPARK-12798] [SQL] generated BroadcastHashJoin

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10989#discussion_r222235960
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---
    @@ -117,6 +120,87 @@ case class BroadcastHashJoin(
           hashJoin(streamedIter, numStreamedRows, hashedRelation, numOutputRows)
         }
       }
    +
    +  // the term for hash relation
    +  private var relationTerm: String = _
    +
    +  override def upstream(): RDD[InternalRow] = {
    +    streamedPlan.asInstanceOf[CodegenSupport].upstream()
    +  }
    +
    +  override def doProduce(ctx: CodegenContext): String = {
    +    // create a name for HashRelation
    +    val broadcastRelation = Await.result(broadcastFuture, timeout)
    +    val broadcast = ctx.addReferenceObj("broadcast", broadcastRelation)
    +    relationTerm = ctx.freshName("relation")
    +    // TODO: create specialized HashRelation for single join key
    +    val clsName = classOf[UnsafeHashedRelation].getName
    +    ctx.addMutableState(clsName, relationTerm,
    +      s"""
    +         | $relationTerm = ($clsName) $broadcast.value();
    +         | incPeakExecutionMemory($relationTerm.getUnsafeSize());
    +       """.stripMargin)
    +
    +    s"""
    +       | ${streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this)}
    +     """.stripMargin
    +  }
    +
    +  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
    +    // generate the key as UnsafeRow
    +    ctx.currentVars = input
    +    val keyExpr = streamedKeys.map(BindReferences.bindReference(_, streamedPlan.output))
    +    val keyVal = GenerateUnsafeProjection.createCode(ctx, keyExpr)
    +    val keyTerm = keyVal.value
    +    val anyNull = if (keyExpr.exists(_.nullable)) s"$keyTerm.anyNull()" else "false"
    +
    +    // find the matches from HashedRelation
    +    val matches = ctx.freshName("matches")
    +    val bufferType = classOf[CompactBuffer[UnsafeRow]].getName
    +    val i = ctx.freshName("i")
    +    val size = ctx.freshName("size")
    +    val row = ctx.freshName("row")
    +
    +    // create variables for output
    +    ctx.currentVars = null
    +    ctx.INPUT_ROW = row
    +    val buildColumns = buildPlan.output.zipWithIndex.map { case (a, i) =>
    +      BoundReference(i, a.dataType, a.nullable).gen(ctx)
    +    }
    +    val resultVars = buildSide match {
    +      case BuildLeft => buildColumns ++ input
    +      case BuildRight => input ++ buildColumns
    +    }
    +
    +    val ouputCode = if (condition.isDefined) {
    +      // filter the output via condition
    +      ctx.currentVars = resultVars
    +      val ev = BindReferences.bindReference(condition.get, this.output).gen(ctx)
    +      s"""
    +         | ${ev.code}
    +         | if (!${ev.isNull} && ${ev.value}) {
    +         |   ${consume(ctx, resultVars)}
    +         | }
    +       """.stripMargin
    +    } else {
    +      consume(ctx, resultVars)
    +    }
    +
    +    s"""
    +       | // generate join key
    +       | ${keyVal.code}
    +       | // find matches from HashRelation
    +       | $bufferType $matches = $anyNull ? null : ($bufferType) $relationTerm.get($keyTerm);
    +       | if ($matches != null) {
    +       |   int $size = $matches.size();
    +       |   for (int $i = 0; $i < $size; $i++) {
    --- End diff --
    
    I see. Then we gotta to keep `matches` as global status instead of local one, so we can go over remaining matched rows in next iterations. And we shouldn't get next row from streaming side but use previous row from that side. It can make a single result row without buffering all matched rows into `currentRows`, though it might need to add some complexity into the generated code.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-178795082
  
    ```
     val dim = broadcast(sqlContext.range(1 << 16).selectExpr("id as k", "cast(id as string) as v"))
    sqlContext.range(values).join(dim, (col("id") % 60000) === col("k")).count()
    
    ```
    
    ```
    /* 001 */
    /* 002 */ public Object generate(Object[] references) {
    /* 003 */   return new GeneratedIterator(references);
    /* 004 */ }
    /* 005 */
    /* 006 */ class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
    /* 007 */
    /* 008 */   private Object[] references;
    /* 009 */   private boolean agg_initAgg0;
    /* 010 */   private boolean agg_bufIsNull1;
    /* 011 */   private long agg_bufValue2;
    /* 012 */   private org.apache.spark.broadcast.TorrentBroadcast broadcasthashjoin_broadcast6;
    /* 013 */   private org.apache.spark.sql.execution.joins.UnsafeHashedRelation broadcasthashjoin_relation7;
    /* 014 */   private boolean range_initRange8;
    /* 015 */   private long range_partitionEnd9;
    /* 016 */   private long range_number10;
    /* 017 */   private boolean range_overflow11;
    /* 018 */   private UnsafeRow broadcasthashjoin_result19;
    /* 019 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder broadcasthashjoin_holder20;
    /* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter broadcasthashjoin_rowWriter21;
    /* 021 */   private UnsafeRow agg_result37;
    /* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder38;
    /* 023 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter39;
    /* 024 */
    /* 025 */   private void initRange(int idx) {
    /* 050 */   }
    /* 051 */
    /* 052 */
    /* 053 */   private void agg_doAggregateWithoutKey5() throws java.io.IOException {
    /* 054 */     // initialize aggregation buffer
    
    /* 057 */     agg_bufIsNull1 = false;
    /* 058 */     agg_bufValue2 = 0L;
    
    /* 063 */     // initialize Range
    /* 064 */     if (!range_initRange8) {
    /* 065 */       range_initRange8 = true;
    /* 066 */       if (input.hasNext()) {
    /* 067 */         initRange(((InternalRow) input.next()).getInt(0));
    /* 068 */       } else {
    /* 069 */         return;
    /* 070 */       }
    /* 071 */     }
    /* 072 */
    /* 073 */     while (!range_overflow11 && range_number10 < range_partitionEnd9) {
    /* 074 */       long range_value12 = range_number10;
    /* 075 */       range_number10 += 1L;
    /* 076 */       if (range_number10 < range_value12 ^ 1L < 0) {
    /* 077 */         range_overflow11 = true;
    /* 078 */       }
    /* 079 */
    /* 080 */       // generate join key
    /* 084 */       broadcasthashjoin_rowWriter21.zeroOutNullBytes();
    /* 089 */       boolean broadcasthashjoin_isNull13 = false;
    /* 090 */       long broadcasthashjoin_value14 = -1L;
    /* 091 */       if (false || 60000L == 0) {
    /* 092 */         broadcasthashjoin_isNull13 = true;
    /* 093 */       } else {
    /* 094 */         /* input[0, bigint] */
    /* 095 */
    /* 096 */         if (false) {
    /* 097 */           broadcasthashjoin_isNull13 = true;
    /* 098 */         } else {
    /* 099 */           broadcasthashjoin_value14 = (long)(range_value12 % 60000L);
    /* 100 */         }
    /* 101 */       }
    /* 102 */       if (broadcasthashjoin_isNull13) {
    /* 103 */         broadcasthashjoin_rowWriter21.setNullAt(0);
    /* 104 */       } else {
    /* 105 */         broadcasthashjoin_rowWriter21.write(0, broadcasthashjoin_value14);
    /* 106 */       }
    /* 107 */
    /* 108 */
    /* 109 */       // find matches from HashRelation
    /* 110 */       org.apache.spark.util.collection.CompactBuffer broadcasthashjoin_matches23 = broadcasthashjoin_result19.anyNull() ? null : (org.apache.spark.util.collection.CompactBuffer) broadcasthashjoin_relation7.get(broadcasthashjoin_result19);
    /* 111 */       if (broadcasthashjoin_matches23 != null) {
    /* 112 */         int broadcasthashjoin_size25 = broadcasthashjoin_matches23.size();
    /* 113 */         for (int broadcasthashjoin_i24 = 0; broadcasthashjoin_i24 < broadcasthashjoin_size25; broadcasthashjoin_i24++) {
    /* 114 */           UnsafeRow broadcasthashjoin_row26 = (UnsafeRow) broadcasthashjoin_matches23.apply(broadcasthashjoin_i24);
    /* 115 */           /* input[0, bigint] */
    /* 116 */           long broadcasthashjoin_value28 = broadcasthashjoin_row26.getLong(0);
    /* 121 */           // do aggregate
    /* 127 */           long agg_value30 = -1L;
    /* 128 */           agg_value30 = agg_bufValue2 + 1L;
    /* 129 */           // update aggregation buffer
    /* 130 */
    /* 131 */           agg_bufIsNull1 = false;
    /* 132 */           agg_bufValue2 = agg_value30;
    /* 136 */         }
    /* 137 */       }
    /* 138 */
    /* 139 */
    /* 140 */       if (shouldStop()) return;
    /* 141 */     }
    /* 144 */   }
    /* 145 */
    /* 146 */
    /* 147 */   public GeneratedIterator(Object[] references) {
    /* 148 */     this.references = references;
    /* 149 */     agg_initAgg0 = false;
    /* 150 */
    /* 151 */
    /* 152 */     this.broadcasthashjoin_broadcast6 = (org.apache.spark.broadcast.TorrentBroadcast) references[0];
    /* 153 */
    /* 154 */     broadcasthashjoin_relation7 = (org.apache.spark.sql.execution.joins.UnsafeHashedRelation) broadcasthashjoin_broadcast6.value();
    /* 155 */     incPeakExecutionMemory(broadcasthashjoin_relation7.getUnsafeSize());
    /* 156 */
    /* 157 */     range_initRange8 = false;
    /* 158 */     range_partitionEnd9 = 0L;
    /* 159 */     range_number10 = 0L;
    /* 160 */     range_overflow11 = false;
    /* 161 */     broadcasthashjoin_result19 = new UnsafeRow(1);
    /* 162 */     this.broadcasthashjoin_holder20 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(broadcasthashjoin_result19, 0);
    /* 163 */     this.broadcasthashjoin_rowWriter21 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(broadcasthashjoin_holder20, 1);
    /* 164 */     agg_result37 = new UnsafeRow(1);
    /* 165 */     this.agg_holder38 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result37, 0);
    /* 166 */     this.agg_rowWriter39 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder38, 1);
    /* 167 */   }
    /* 168 */
    /* 169 */   protected void processNext() throws java.io.IOException {
    /* 170 */
    /* 171 */     if (!agg_initAgg0) {
    /* 172 */       agg_initAgg0 = true;
    /* 173 */       agg_doAggregateWithoutKey5();
    /* 174 */
    /* 175 */       // output the result
    /* 179 */       agg_rowWriter39.zeroOutNullBytes();
    /* 183 */       if (agg_bufIsNull1) {
    /* 184 */         agg_rowWriter39.setNullAt(0);
    /* 185 */       } else {
    /* 186 */         agg_rowWriter39.write(0, agg_bufValue2);
    /* 187 */       }
    /* 188 */       currentRows.add(agg_result37.copy());
    /* 190 */     }
    /* 192 */   }
    /* 193 */ }
    /* 194 */
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-176974519
  
    **[Test build #50409 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50409/consoleFull)** for PR 10989 at commit [`f234c21`](https://github.com/apache/spark/commit/f234c21655f8ceb2f0567a170c38e1004b6cd3d9).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-179042340
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-177064749
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50433/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-176973970
  
    **[Test build #50409 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50409/consoleFull)** for PR 10989 at commit [`f234c21`](https://github.com/apache/spark/commit/f234c21655f8ceb2f0567a170c38e1004b6cd3d9).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/10989


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-177090960
  
    **[Test build #50435 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50435/consoleFull)** for PR 10989 at commit [`dcf4fdc`](https://github.com/apache/spark/commit/dcf4fdc0b955fb5abd9686447710ecc0a76ce990).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-178147379
  
    **[Test build #2486 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2486/consoleFull)** for PR 10989 at commit [`c1c0588`](https://github.com/apache/spark/commit/c1c0588053af5aa359b6d03bac6c5d0b198c5b69).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-177090808
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #10989: [SPARK-12798] [SQL] generated BroadcastHashJoin

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10989#discussion_r222144135
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---
    @@ -117,6 +120,87 @@ case class BroadcastHashJoin(
           hashJoin(streamedIter, numStreamedRows, hashedRelation, numOutputRows)
         }
       }
    +
    +  // the term for hash relation
    +  private var relationTerm: String = _
    +
    +  override def upstream(): RDD[InternalRow] = {
    +    streamedPlan.asInstanceOf[CodegenSupport].upstream()
    +  }
    +
    +  override def doProduce(ctx: CodegenContext): String = {
    +    // create a name for HashRelation
    +    val broadcastRelation = Await.result(broadcastFuture, timeout)
    +    val broadcast = ctx.addReferenceObj("broadcast", broadcastRelation)
    +    relationTerm = ctx.freshName("relation")
    +    // TODO: create specialized HashRelation for single join key
    +    val clsName = classOf[UnsafeHashedRelation].getName
    +    ctx.addMutableState(clsName, relationTerm,
    +      s"""
    +         | $relationTerm = ($clsName) $broadcast.value();
    +         | incPeakExecutionMemory($relationTerm.getUnsafeSize());
    +       """.stripMargin)
    +
    +    s"""
    +       | ${streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this)}
    +     """.stripMargin
    +  }
    +
    +  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
    +    // generate the key as UnsafeRow
    +    ctx.currentVars = input
    +    val keyExpr = streamedKeys.map(BindReferences.bindReference(_, streamedPlan.output))
    +    val keyVal = GenerateUnsafeProjection.createCode(ctx, keyExpr)
    +    val keyTerm = keyVal.value
    +    val anyNull = if (keyExpr.exists(_.nullable)) s"$keyTerm.anyNull()" else "false"
    +
    +    // find the matches from HashedRelation
    +    val matches = ctx.freshName("matches")
    +    val bufferType = classOf[CompactBuffer[UnsafeRow]].getName
    +    val i = ctx.freshName("i")
    +    val size = ctx.freshName("size")
    +    val row = ctx.freshName("row")
    +
    +    // create variables for output
    +    ctx.currentVars = null
    +    ctx.INPUT_ROW = row
    +    val buildColumns = buildPlan.output.zipWithIndex.map { case (a, i) =>
    +      BoundReference(i, a.dataType, a.nullable).gen(ctx)
    +    }
    +    val resultVars = buildSide match {
    +      case BuildLeft => buildColumns ++ input
    +      case BuildRight => input ++ buildColumns
    +    }
    +
    +    val ouputCode = if (condition.isDefined) {
    +      // filter the output via condition
    +      ctx.currentVars = resultVars
    +      val ev = BindReferences.bindReference(condition.get, this.output).gen(ctx)
    +      s"""
    +         | ${ev.code}
    +         | if (!${ev.isNull} && ${ev.value}) {
    +         |   ${consume(ctx, resultVars)}
    +         | }
    +       """.stripMargin
    +    } else {
    +      consume(ctx, resultVars)
    +    }
    +
    +    s"""
    +       | // generate join key
    +       | ${keyVal.code}
    +       | // find matches from HashRelation
    +       | $bufferType $matches = $anyNull ? null : ($bufferType) $relationTerm.get($keyTerm);
    +       | if ($matches != null) {
    +       |   int $size = $matches.size();
    +       |   for (int $i = 0; $i < $size; $i++) {
    --- End diff --
    
    Yea the code change a lot but we still generate loops for broadcast join.
    
    This PR made `BufferedRowIterator.currentRow` to `BufferedRowIterator.currentRows`, to store result rows instead of a single row. If we can interrupt the loop and can still run it in the next call of `processNext`, we can still keep a single result row. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-177090810
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50436/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #10989: [SPARK-12798] [SQL] generated BroadcastHashJoin

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10989#discussion_r221972056
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---
    @@ -117,6 +120,87 @@ case class BroadcastHashJoin(
           hashJoin(streamedIter, numStreamedRows, hashedRelation, numOutputRows)
         }
       }
    +
    +  // the term for hash relation
    +  private var relationTerm: String = _
    +
    +  override def upstream(): RDD[InternalRow] = {
    +    streamedPlan.asInstanceOf[CodegenSupport].upstream()
    +  }
    +
    +  override def doProduce(ctx: CodegenContext): String = {
    +    // create a name for HashRelation
    +    val broadcastRelation = Await.result(broadcastFuture, timeout)
    +    val broadcast = ctx.addReferenceObj("broadcast", broadcastRelation)
    +    relationTerm = ctx.freshName("relation")
    +    // TODO: create specialized HashRelation for single join key
    +    val clsName = classOf[UnsafeHashedRelation].getName
    +    ctx.addMutableState(clsName, relationTerm,
    +      s"""
    +         | $relationTerm = ($clsName) $broadcast.value();
    +         | incPeakExecutionMemory($relationTerm.getUnsafeSize());
    +       """.stripMargin)
    +
    +    s"""
    +       | ${streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this)}
    +     """.stripMargin
    +  }
    +
    +  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
    +    // generate the key as UnsafeRow
    +    ctx.currentVars = input
    +    val keyExpr = streamedKeys.map(BindReferences.bindReference(_, streamedPlan.output))
    +    val keyVal = GenerateUnsafeProjection.createCode(ctx, keyExpr)
    +    val keyTerm = keyVal.value
    +    val anyNull = if (keyExpr.exists(_.nullable)) s"$keyTerm.anyNull()" else "false"
    +
    +    // find the matches from HashedRelation
    +    val matches = ctx.freshName("matches")
    +    val bufferType = classOf[CompactBuffer[UnsafeRow]].getName
    +    val i = ctx.freshName("i")
    +    val size = ctx.freshName("size")
    +    val row = ctx.freshName("row")
    +
    +    // create variables for output
    +    ctx.currentVars = null
    +    ctx.INPUT_ROW = row
    +    val buildColumns = buildPlan.output.zipWithIndex.map { case (a, i) =>
    +      BoundReference(i, a.dataType, a.nullable).gen(ctx)
    +    }
    +    val resultVars = buildSide match {
    +      case BuildLeft => buildColumns ++ input
    +      case BuildRight => input ++ buildColumns
    +    }
    +
    +    val ouputCode = if (condition.isDefined) {
    +      // filter the output via condition
    +      ctx.currentVars = resultVars
    +      val ev = BindReferences.bindReference(condition.get, this.output).gen(ctx)
    +      s"""
    +         | ${ev.code}
    +         | if (!${ev.isNull} && ${ev.value}) {
    +         |   ${consume(ctx, resultVars)}
    +         | }
    +       """.stripMargin
    +    } else {
    +      consume(ctx, resultVars)
    +    }
    +
    +    s"""
    +       | // generate join key
    +       | ${keyVal.code}
    +       | // find matches from HashRelation
    +       | $bufferType $matches = $anyNull ? null : ($bufferType) $relationTerm.get($keyTerm);
    +       | if ($matches != null) {
    +       |   int $size = $matches.size();
    +       |   for (int $i = 0; $i < $size; $i++) {
    --- End diff --
    
    hmm, yeah, this code is changed a lot since this PR, looks like at that moment this `BroadcastHashJoin` only supports inner join. I also don't really get the idea to interrupt this loop early, as looks like we need to go through all matched rows here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10989#discussion_r51461171
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java ---
    @@ -31,22 +33,20 @@
      * TODO: replaced it by batched columnar format.
      */
     public class BufferedRowIterator {
    -  protected InternalRow currentRow;
    +  protected LinkedList<InternalRow> currentRows = new LinkedList<>();
    --- End diff --
    
    It's a huge topic, let's talk about this offline.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-178179456
  
    Can you include the generated code?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-179077222
  
    **[Test build #2509 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2509/consoleFull)** for PR 10989 at commit [`e0c8c65`](https://github.com/apache/spark/commit/e0c8c652b86ce9d17bcb5d629e6b55563b5c382b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-177064748
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-178809044
  
    **[Test build #50587 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50587/consoleFull)** for PR 10989 at commit [`4d75022`](https://github.com/apache/spark/commit/4d7502206d17ae3f148a620bed15eff38b35018e).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-177111489
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50438/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #10989: [SPARK-12798] [SQL] generated BroadcastHashJoin

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10989#discussion_r221961271
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---
    @@ -117,6 +120,87 @@ case class BroadcastHashJoin(
           hashJoin(streamedIter, numStreamedRows, hashedRelation, numOutputRows)
         }
       }
    +
    +  // the term for hash relation
    +  private var relationTerm: String = _
    +
    +  override def upstream(): RDD[InternalRow] = {
    +    streamedPlan.asInstanceOf[CodegenSupport].upstream()
    +  }
    +
    +  override def doProduce(ctx: CodegenContext): String = {
    +    // create a name for HashRelation
    +    val broadcastRelation = Await.result(broadcastFuture, timeout)
    +    val broadcast = ctx.addReferenceObj("broadcast", broadcastRelation)
    +    relationTerm = ctx.freshName("relation")
    +    // TODO: create specialized HashRelation for single join key
    +    val clsName = classOf[UnsafeHashedRelation].getName
    +    ctx.addMutableState(clsName, relationTerm,
    +      s"""
    +         | $relationTerm = ($clsName) $broadcast.value();
    +         | incPeakExecutionMemory($relationTerm.getUnsafeSize());
    +       """.stripMargin)
    +
    +    s"""
    +       | ${streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this)}
    +     """.stripMargin
    +  }
    +
    +  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
    +    // generate the key as UnsafeRow
    +    ctx.currentVars = input
    +    val keyExpr = streamedKeys.map(BindReferences.bindReference(_, streamedPlan.output))
    +    val keyVal = GenerateUnsafeProjection.createCode(ctx, keyExpr)
    +    val keyTerm = keyVal.value
    +    val anyNull = if (keyExpr.exists(_.nullable)) s"$keyTerm.anyNull()" else "false"
    +
    +    // find the matches from HashedRelation
    +    val matches = ctx.freshName("matches")
    +    val bufferType = classOf[CompactBuffer[UnsafeRow]].getName
    +    val i = ctx.freshName("i")
    +    val size = ctx.freshName("size")
    +    val row = ctx.freshName("row")
    +
    +    // create variables for output
    +    ctx.currentVars = null
    +    ctx.INPUT_ROW = row
    +    val buildColumns = buildPlan.output.zipWithIndex.map { case (a, i) =>
    +      BoundReference(i, a.dataType, a.nullable).gen(ctx)
    +    }
    +    val resultVars = buildSide match {
    +      case BuildLeft => buildColumns ++ input
    +      case BuildRight => input ++ buildColumns
    +    }
    +
    +    val ouputCode = if (condition.isDefined) {
    +      // filter the output via condition
    +      ctx.currentVars = resultVars
    +      val ev = BindReferences.bindReference(condition.get, this.output).gen(ctx)
    +      s"""
    +         | ${ev.code}
    +         | if (!${ev.isNull} && ${ev.value}) {
    +         |   ${consume(ctx, resultVars)}
    +         | }
    +       """.stripMargin
    +    } else {
    +      consume(ctx, resultVars)
    +    }
    +
    +    s"""
    +       | // generate join key
    +       | ${keyVal.code}
    +       | // find matches from HashRelation
    +       | $bufferType $matches = $anyNull ? null : ($bufferType) $relationTerm.get($keyTerm);
    +       | if ($matches != null) {
    +       |   int $size = $matches.size();
    +       |   for (int $i = 0; $i < $size; $i++) {
    --- End diff --
    
    I don't see a strong reason that we can't interrupt this loop. We can make `i` a global variable for example.
    
    I don't mean to change anything, but just to verify my understanding. Also cc @viirya @mgaido91 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-179039734
  
    After rebased to master:
    
    ```
    /* 001 */
    /* 002 */ public Object generate(Object[] references) {
    /* 003 */   return new GeneratedIterator(references);
    /* 004 */ }
    /* 005 */
    /* 006 */ class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
    /* 007 */
    /* 008 */   private Object[] references;
    /* 009 */   private boolean agg_initAgg;
    /* 010 */   private boolean agg_bufIsNull;
    /* 011 */   private long agg_bufValue;
    /* 012 */   private org.apache.spark.broadcast.TorrentBroadcast broadcasthashjoin_broadcast;
    /* 013 */   private org.apache.spark.sql.execution.joins.UnsafeHashedRelation broadcasthashjoin_relation;
    /* 014 */   private boolean range_initRange;
    /* 015 */   private long range_partitionEnd;
    /* 016 */   private long range_number;
    /* 017 */   private boolean range_overflow;
    /* 018 */   private UnsafeRow broadcasthashjoin_result;
    /* 019 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder broadcasthashjoin_holder;
    /* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter broadcasthashjoin_rowWriter;
    /* 021 */   private UnsafeRow agg_result;
    /* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
    /* 023 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
    /* 024 */
    /* 025 */   public GeneratedIterator(Object[] references) {
    /* 026 */     this.references = references;
    /* 027 */     agg_initAgg = false;
    /* 028 */
    /* 029 */
    /* 030 */     this.broadcasthashjoin_broadcast = (org.apache.spark.broadcast.TorrentBroadcast) references[0];
    /* 031 */
    /* 032 */     broadcasthashjoin_relation = (org.apache.spark.sql.execution.joins.UnsafeHashedRelation) broadcasthashjoin_broadcast.value();
    /* 033 */     incPeakExecutionMemory(broadcasthashjoin_relation.getUnsafeSize());
    /* 034 */
    /* 035 */     range_initRange = false;
    /* 036 */     range_partitionEnd = 0L;
    /* 037 */     range_number = 0L;
    /* 038 */     range_overflow = false;
    /* 039 */     broadcasthashjoin_result = new UnsafeRow(1);
    /* 040 */     this.broadcasthashjoin_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(broadcasthashjoin_result, 0);
    /* 041 */     this.broadcasthashjoin_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(broadcasthashjoin_holder, 1);
    /* 042 */     agg_result = new UnsafeRow(1);
    /* 043 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
    /* 044 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
    /* 045 */   }
    /* 046 */
    /* 047 */
    /* 048 */   private void agg_doAggregateWithoutKey() throws java.io.IOException {
    /* 049 */     // initialize aggregation buffer
    /* 050 */
    /* 051 */     agg_bufIsNull = false;
    /* 052 */     agg_bufValue = 0L;
    /* 053 */
    /* 054 */
    /* 055 */
    /* 056 */
    /* 057 */     // initialize Range
    /* 058 */     if (!range_initRange) {
    /* 059 */       range_initRange = true;
    /* 060 */       if (input.hasNext()) {
    /* 061 */         initRange(((InternalRow) input.next()).getInt(0));
    /* 062 */       } else {
    /* 063 */         return;
    /* 064 */       }
    /* 065 */     }
    /* 066 */
    /* 067 */     while (!range_overflow && range_number < range_partitionEnd) {
    /* 068 */       long range_value = range_number;
    /* 069 */       range_number += 1L;
    /* 070 */       if (range_number < range_value ^ 1L < 0) {
    /* 071 */         range_overflow = true;
    /* 072 */       }
    /* 073 */
    /* 074 */       // generate join key
    /* 075 */
    /* 076 */
    /* 077 */
    /* 078 */       broadcasthashjoin_rowWriter.zeroOutNullBytes();
    /* 079 */
    /* 080 */       /* (input[0, bigint] % 60000) */
    /* 081 */       boolean broadcasthashjoin_isNull = false;
    /* 082 */       long broadcasthashjoin_value = -1L;
    /* 083 */       if (false || 60000L == 0) {
    /* 084 */         broadcasthashjoin_isNull = true;
    /* 085 */       } else {
    /* 086 */
    /* 087 */         if (false) {
    /* 088 */           broadcasthashjoin_isNull = true;
    /* 089 */         } else {
    /* 090 */           broadcasthashjoin_value = (long)(range_value % 60000L);
    /* 091 */         }
    /* 092 */       }
    /* 093 */       if (broadcasthashjoin_isNull) {
    /* 094 */         broadcasthashjoin_rowWriter.setNullAt(0);
    /* 095 */       } else {
    /* 096 */         broadcasthashjoin_rowWriter.write(0, broadcasthashjoin_value);
    /* 097 */       }
    /* 098 */
    /* 099 */
    /* 100 */       // find matches from HashRelation
    /* 101 */       org.apache.spark.util.collection.CompactBuffer broadcasthashjoin_matches = broadcasthashjoin_result.anyNull() ? null : (org.apache.spark.util.collection.CompactBuffer) broadcasthashjoin_relation.get(broadcasthashjoin_result);
    /* 102 */       if (broadcasthashjoin_matches != null) {
    /* 103 */         int broadcasthashjoin_size = broadcasthashjoin_matches.size();
    /* 104 */         for (int broadcasthashjoin_i = 0; broadcasthashjoin_i < broadcasthashjoin_size; broadcasthashjoin_i++) {
    /* 105 */           UnsafeRow broadcasthashjoin_row = (UnsafeRow) broadcasthashjoin_matches.apply(broadcasthashjoin_i);
    /* 106 */           /* input[0, bigint] */
    /* 107 */           long broadcasthashjoin_value3 = broadcasthashjoin_row.getLong(0);
    /* 108 */
    /* 109 */
    /* 110 */
    /* 111 */
    /* 112 */           // do aggregate
    /* 113 */           /* (input[0, bigint] + 1) */
    /* 114 */           long agg_value1 = -1L;
    /* 115 */           agg_value1 = agg_bufValue + 1L;
    /* 116 */           // update aggregation buffer
    /* 117 */           agg_bufIsNull = false;
    /* 118 */           agg_bufValue = agg_value1;
    /* 119 */
    /* 120 */
    /* 121 */         }
    /* 122 */       }
    /* 123 */
    /* 124 */
    /* 125 */       if (shouldStop()) return;
    /* 126 */     }
    /* 127 */
    /* 128 */
    /* 129 */   }
    /* 130 */
    /* 131 */
    /* 132 */   private void initRange(int idx) {
    /* 133 */     java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
    /* 134 */     java.math.BigInteger numSlice = java.math.BigInteger.valueOf(1L);
    /* 135 */     java.math.BigInteger numElement = java.math.BigInteger.valueOf(10485760L);
    /* 136 */     java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
    /* 137 */     java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
    /* 138 */
    /* 139 */     java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
    /* 140 */     if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
    /* 141 */       range_number = Long.MAX_VALUE;
    /* 142 */     } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
    /* 143 */       range_number = Long.MIN_VALUE;
    /* 144 */     } else {
    /* 145 */       range_number = st.longValue();
    /* 146 */     }
    /* 147 */
    /* 148 */     java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
    /* 149 */     .multiply(step).add(start);
    /* 150 */     if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
    /* 151 */       range_partitionEnd = Long.MAX_VALUE;
    /* 152 */     } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
    /* 153 */       range_partitionEnd = Long.MIN_VALUE;
    /* 154 */     } else {
    /* 155 */       range_partitionEnd = end.longValue();
    /* 156 */     }
    /* 157 */   }
    /* 158 */
    /* 159 */
    /* 160 */   protected void processNext() throws java.io.IOException {
    /* 161 */     if (!agg_initAgg) {
    /* 162 */       agg_initAgg = true;
    /* 163 */       agg_doAggregateWithoutKey();
    /* 164 */
    /* 165 */       // output the result
    /* 166 */
    /* 167 */
    /* 168 */       agg_rowWriter.zeroOutNullBytes();
    /* 169 */
    /* 170 */
    /* 171 */       if (agg_bufIsNull) {
    /* 172 */         agg_rowWriter.setNullAt(0);
    /* 173 */       } else {
    /* 174 */         agg_rowWriter.write(0, agg_bufValue);
    /* 175 */       }
    /* 176 */       currentRows.add(agg_result.copy());
    /* 177 */     }
    /* 178 */   }
    /* 179 */ }
    /* 180 */
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10989#discussion_r51477107
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java ---
    @@ -54,13 +54,27 @@ public void setInput(Iterator<InternalRow> iter) {
       }
     
       /**
    +   * Returns whether `processNext()` should stop processing next row from `input` or not.
    +   */
    +  protected boolean shouldStop() {
    --- End diff --
    
    @rxin this seems like it could be used to support limit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-179042346
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50649/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-178133344
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-177090993
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50435/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-179040226
  
    **[Test build #50650 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50650/consoleFull)** for PR 10989 at commit [`e0c8c65`](https://github.com/apache/spark/commit/e0c8c652b86ce9d17bcb5d629e6b55563b5c382b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-179045161
  
    **[Test build #2509 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2509/consoleFull)** for PR 10989 at commit [`e0c8c65`](https://github.com/apache/spark/commit/e0c8c652b86ce9d17bcb5d629e6b55563b5c382b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-177091016
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50437/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-178901909
  
    **[Test build #2496 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2496/consoleFull)** for PR 10989 at commit [`4d75022`](https://github.com/apache/spark/commit/4d7502206d17ae3f148a620bed15eff38b35018e).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-177111361
  
    **[Test build #50438 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50438/consoleFull)** for PR 10989 at commit [`0139fde`](https://github.com/apache/spark/commit/0139fdeeefc2038e995c44c7e966e09e30063418).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-177095797
  
    **[Test build #50438 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50438/consoleFull)** for PR 10989 at commit [`0139fde`](https://github.com/apache/spark/commit/0139fdeeefc2038e995c44c7e966e09e30063418).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-177111484
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-179395364
  
    LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-178809121
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50587/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-178188600
  
    **[Test build #2486 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2486/consoleFull)** for PR 10989 at commit [`c1c0588`](https://github.com/apache/spark/commit/c1c0588053af5aa359b6d03bac6c5d0b198c5b69).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-178953521
  
    **[Test build #2496 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2496/consoleFull)** for PR 10989 at commit [`4d75022`](https://github.com/apache/spark/commit/4d7502206d17ae3f148a620bed15eff38b35018e).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10989#discussion_r51356447
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java ---
    @@ -31,22 +33,20 @@
      * TODO: replaced it by batched columnar format.
      */
     public class BufferedRowIterator {
    -  protected InternalRow currentRow;
    +  protected LinkedList<InternalRow> currentRows = new LinkedList<>();
    --- End diff --
    
    orthogonal to this pr -- my first reaction to this is that maybe we should spend a week or two to convert all operators to a push-based model. Otherwise performance is going to suck big time for some operators.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12798] [SQL] generated BroadcastHashJoi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10989#issuecomment-178133351
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50495/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org