You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by cloud-fan <gi...@git.apache.org> on 2018/10/04 15:48:43 UTC

[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

GitHub user cloud-fan opened a pull request:

    https://github.com/apache/spark/pull/22630

    [SPARK-25497][SQL] Limit operation within whole stage codegen should not consume all the inputs

    ## What changes were proposed in this pull request?
    
    This PR is inspired by https://github.com/apache/spark/pull/22524, but picks a more aggressive fix.
    
    The current limit whole stage codegen has 2 problems:
    1. It's only applied to `InputAdapter`, many leaf nodes can't stop earlier w.r.t. limit.
    2. It needs to override a method, which will break if we have more than one limit in the whole-stage.
    
    The first problem is easy to fix, just figure out which nodes can stop earlier w.r.t. limit, and update them. This PR updates `RangeExec`, `ColumnarBatchScan`, `SortExec`, `HashAggregateExec` and `SortMergeJoinExec`.
    
    The second problem is hard to fix. This PR proposes to propagate the limit counter variable name upstream, so that the upstream leaf/blocking nodes can check the limit counter and quit the loop earlier.
    
    For better performance, the implementation here follows `CodegenSupport.needStopCheck`, so that we only codegen the check only if there is limit in the query. For columnar node like range, we check the limit counter per-batch instead of per-row, to make the inner loop tight and fast.
    
    ## How was this patch tested?
    
    a new test

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/cloud-fan/spark limit

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22630.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22630
    
----
commit d9b54d5c6edd4f5337efb2d185dbb58f33972616
Author: Wenchen Fan <we...@...>
Date:   2018-10-03T00:00:54Z

    Limit operation within whole stage codegen should not consume all the inputs

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #97109 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97109/testReport)** for PR 22630 at commit [`9114107`](https://github.com/apache/spark/commit/9114107065997c42bdae6c5ecdbcc5330415a04e).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223024087
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -452,46 +452,68 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
     
         val localIdx = ctx.freshName("localIdx")
         val localEnd = ctx.freshName("localEnd")
    -    val range = ctx.freshName("range")
         val shouldStop = if (parent.needStopCheck) {
    -      s"if (shouldStop()) { $number = $value + ${step}L; return; }"
    +      s"if (shouldStop()) { $nextIndex = $value + ${step}L; return; }"
    --- End diff --
    
    You are right about the problem, but I'm not going to touch this part in this PR. Note that this PR focuses on limit whole stage codegen.
    
    Personally I feel it's ok to make the metrics a little inaccurate for better performance, we can discuss it later in other PRs.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #97136 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97136/testReport)** for PR 22630 at commit [`4fc4301`](https://github.com/apache/spark/commit/4fc43010c7c466e7a3db6a08c554adc78719db76).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    LGTM


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #96984 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96984/testReport)** for PR 22630 at commit [`a31f601`](https://github.com/apache/spark/commit/a31f601bedeb783fbc54b6f44f4c693a175fca11).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3795/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223027028
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
    @@ -518,56 +521,81 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
         testMetricsDynamicPartition("parquet", "parquet", "t1")
       }
     
    +  private def collectNodeWithinWholeStage[T <: SparkPlan : ClassTag](plan: SparkPlan): Seq[T] = {
    +    val stages = plan.collect {
    --- End diff --
    
    we also want to detect the case of 2 whole stage codegen and fail.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #97099 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97099/testReport)** for PR 22630 at commit [`d815c0b`](https://github.com/apache/spark/commit/d815c0bffc7f23325c256d436c243af94cb2a228).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223315367
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala ---
    @@ -360,6 +360,10 @@ trait CodegenSupport extends SparkPlan {
        * limit-not-reached checks.
        */
       final def limitNotReachedCond: String = {
    +    // InputAdapter is also a leaf node.
    +    val isLeafNode = children.isEmpty || this.isInstanceOf[InputAdapter]
    +    assert(isLeafNode || this.isInstanceOf[BlockingOperatorWithCodegen],
    --- End diff --
    
    ah good idea!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3707/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    cc @viirya @mgaido91 @kiszk 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r222956995
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala ---
    @@ -345,6 +345,27 @@ trait CodegenSupport extends SparkPlan {
        * don't require shouldStop() in the loop of producing rows.
        */
       def needStopCheck: Boolean = parent.needStopCheck
    +
    +  /**
    +   * A sequence of checks which evaluate to true if the downstream Limit operators have not received
    +   * enough records and reached the limit. If current node is a data producing node, it can leverage
    +   * this information to stop producing data and complete the data flow earlier. Common data
    +   * producing nodes are leaf nodes like Range and Scan, and blocking nodes like Sort and Aggregate.
    +   * These checks should be put into the loop condition of the data producing loop.
    +   */
    +  def limitNotReachedChecks: Seq[String] = parent.limitNotReachedChecks
    +
    +  /**
    +   * A helper method to generate the data producing loop condition according to the
    +   * limit-not-reached checks.
    +   */
    +  final def limitNotReachedCond: String = {
    +    if (parent.limitNotReachedChecks.isEmpty) {
    +      ""
    +    } else {
    +      parent.limitNotReachedChecks.mkString(" && ", " && ", "")
    --- End diff --
    
    here we are assuming that this is going to be in and with an already existing condition. I don't see a case in which this may be used is a different context as of now, but what about just producing the conditions here and put the initial and outside of this? It may be easier to reuse this. WDYT?



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223319859
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala ---
    @@ -345,6 +345,61 @@ trait CodegenSupport extends SparkPlan {
        * don't require shouldStop() in the loop of producing rows.
        */
       def needStopCheck: Boolean = parent.needStopCheck
    +
    +  /**
    +   * A sequence of checks which evaluate to true if the downstream Limit operators have not received
    +   * enough records and reached the limit. If current node is a data producing node, it can leverage
    +   * this information to stop producing data and complete the data flow earlier. Common data
    +   * producing nodes are leaf nodes like Range and Scan, and blocking nodes like Sort and Aggregate.
    +   * These checks should be put into the loop condition of the data producing loop.
    +   */
    +  def limitNotReachedChecks: Seq[String] = parent.limitNotReachedChecks
    +
    +  /**
    +   * A helper method to generate the data producing loop condition according to the
    +   * limit-not-reached checks.
    +   */
    +  final def limitNotReachedCond: String = {
    +    // InputAdapter is also a leaf node.
    +    val isLeafNode = children.isEmpty || this.isInstanceOf[InputAdapter]
    +    if (isLeafNode || this.isInstanceOf[BlockingOperatorWithCodegen]) {
    +      val errMsg = "only leaf nodes and blocking nodes need to call 'limitNotReachedCond' " +
    +        "in its data producing loop."
    +      if (Utils.isTesting) {
    +        throw new IllegalStateException(errMsg)
    +      } else {
    +        logWarning(errMsg)
    --- End diff --
    
    nit: shall we also mention to report to the community if seen?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223203167
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala ---
    @@ -132,6 +132,13 @@ case class SortExec(
       // a stop check before sorting.
       override def needStopCheck: Boolean = false
     
    +  // Sort is a blocking operator. It needs to consume all the inputs before producing any output.
    +  // This means, Limit operator after Sort will never reach its limit during the execution of Sort's
    +  // upstream operators. Here we override this method to return Nil, so that upstream operators will
    +  // not generate useless conditions (which are always evaluated to false) for the Limit operators
    +  // after Sort.
    +  override def limitNotReachedChecks: Seq[String] = Nil
    --- End diff --
    
    I am fine to do it later, but I'd like to avoid to have other places where we duplicate this logic in the future in order to avoid possible mistakes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/22630


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r222726409
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -452,46 +452,68 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
     
         val localIdx = ctx.freshName("localIdx")
         val localEnd = ctx.freshName("localEnd")
    -    val range = ctx.freshName("range")
         val shouldStop = if (parent.needStopCheck) {
    -      s"if (shouldStop()) { $number = $value + ${step}L; return; }"
    +      s"if (shouldStop()) { $nextIndex = $value + ${step}L; return; }"
         } else {
           "// shouldStop check is eliminated"
         }
    +
    +    // An overview of the Range processing.
    +    //
    +    // For each partition, the Range task needs to produce records from partition start(inclusive)
    +    // to end(exclusive). For better performance, we separate the partition range into batches, and
    +    // use 2 loops to produce data. The outer while loop is used to iterate batches, and the inner
    +    // for loop is used to iterate records inside a batch.
    +    //
    +    // `nextIndex` tracks the index of the next record that is going to be consumed, initialized
    +    // with partition start. `batchEnd` tracks the end index of the current batch, initialized
    +    // with `nextIndex`. In the outer loop, we first check if `nextIndex == batchEnd`. If it's true,
    +    // it means the current batch is fully consumed, and we will update `batchEnd` to process the
    +    // next batch. If `batchEnd` reaches partition end, exit the outer loop. finally we enter the
    +    // inner loop. Note that, when we enter inner loop, `nextIndex` must be different from
    +    // `batchEnd`, otherwise the outer loop should already exits.
    +    //
    +    // The inner loop iterates from 0 to `localEnd`, which is calculated by
    +    // `(batchEnd - nextIndex) / step`. Since `batchEnd` is increased by `nextBatchTodo * step` in
    +    // the outer loop, and initialized with `nextIndex`, so `batchEnd - nextIndex` is always
    +    // divisible by `step`. The `nextIndex` is increased by `step` during each iteration, and ends
    +    // up being equal to `batchEnd` when the inner loop finishes.
    +    //
    +    // The inner loop can be interrupted, if the query has produced at least one result row, so that
    +    // we don't buffer too many result rows and waste memory. It's ok to interrupt the inner loop,
    +    // because `nextIndex` will be updated before interrupting.
    +
         s"""
           | // initialize Range
           | if (!$initTerm) {
           |   $initTerm = true;
           |   $initRangeFuncName(partitionIndex);
           | }
           |
    -      | while (true) {
    -      |   long $range = $batchEnd - $number;
    -      |   if ($range != 0L) {
    -      |     int $localEnd = (int)($range / ${step}L);
    -      |     for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
    -      |       long $value = ((long)$localIdx * ${step}L) + $number;
    -      |       ${consume(ctx, Seq(ev))}
    -      |       $shouldStop
    +      | while (true$keepProducingDataCond) {
    +      |   if ($nextIndex == $batchEnd) {
    +      |     long $nextBatchTodo;
    +      |     if ($numElementsTodo > ${batchSize}L) {
    +      |       $nextBatchTodo = ${batchSize}L;
    +      |       $numElementsTodo -= ${batchSize}L;
    +      |     } else {
    +      |       $nextBatchTodo = $numElementsTodo;
    +      |       $numElementsTodo = 0;
    +      |       if ($nextBatchTodo == 0) break;
           |     }
    -      |     $number = $batchEnd;
    +      |     $numOutput.add($nextBatchTodo);
    +      |     $inputMetrics.incRecordsRead($nextBatchTodo);
    +      |     $batchEnd += $nextBatchTodo * ${step}L;
           |   }
           |
    -      |   $taskContext.killTaskIfInterrupted();
    -      |
    -      |   long $nextBatchTodo;
    -      |   if ($numElementsTodo > ${batchSize}L) {
    -      |     $nextBatchTodo = ${batchSize}L;
    -      |     $numElementsTodo -= ${batchSize}L;
    -      |   } else {
    -      |     $nextBatchTodo = $numElementsTodo;
    -      |     $numElementsTodo = 0;
    -      |     if ($nextBatchTodo == 0) break;
    +      |   int $localEnd = (int)(($batchEnd - $nextIndex) / ${step}L);
    --- End diff --
    
    The change here simply moves the inner loop after the `batchEnd` and metrics update, so that we can get correct metrics when we can stop earlier because of limit.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #96944 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96944/testReport)** for PR 22630 at commit [`d9b54d5`](https://github.com/apache/spark/commit/d9b54d5c6edd4f5337efb2d185dbb58f33972616).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223318798
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala ---
    @@ -362,8 +362,15 @@ trait CodegenSupport extends SparkPlan {
       final def limitNotReachedCond: String = {
         // InputAdapter is also a leaf node.
         val isLeafNode = children.isEmpty || this.isInstanceOf[InputAdapter]
    -    assert(isLeafNode || this.isInstanceOf[BlockingOperatorWithCodegen],
    -      "only leaf nodes and blocking nodes need to call this method in its data producing loop.")
    +    if (isLeafNode || this.isInstanceOf[BlockingOperatorWithCodegen]) {
    --- End diff --
    
    `if (!isLeafNode && !this.isInstanceOf[BlockingOperatorWithCodegen])`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223027872
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -452,46 +452,68 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
     
         val localIdx = ctx.freshName("localIdx")
         val localEnd = ctx.freshName("localEnd")
    -    val range = ctx.freshName("range")
         val shouldStop = if (parent.needStopCheck) {
    -      s"if (shouldStop()) { $number = $value + ${step}L; return; }"
    +      s"if (shouldStop()) { $nextIndex = $value + ${step}L; return; }"
    --- End diff --
    
    BTW I do have a local branch that fixed this problem, I just don't have time to benchmark it yet. I'll send it out later and let's move the discussion there.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3790/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #97109 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97109/testReport)** for PR 22630 at commit [`9114107`](https://github.com/apache/spark/commit/9114107065997c42bdae6c5ecdbcc5330415a04e).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97109/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #96986 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96986/testReport)** for PR 22630 at commit [`2188b27`](https://github.com/apache/spark/commit/2188b2737b1de4b401d82012bcc15c748cb164ad).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223024598
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -46,6 +46,15 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode
       }
     }
     
    +object BaseLimitExec {
    +  private val curId = new java.util.concurrent.atomic.AtomicInteger()
    +
    +  def newLimitCountTerm(): String = {
    +    val id = curId.getAndIncrement()
    +    s"_limit_counter_$id"
    +  }
    --- End diff --
    
    see `MapObjects.apply` as an existing example.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #97136 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97136/testReport)** for PR 22630 at commit [`4fc4301`](https://github.com/apache/spark/commit/4fc43010c7c466e7a3db6a08c554adc78719db76).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97136/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223525444
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -452,46 +452,73 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
     
         val localIdx = ctx.freshName("localIdx")
         val localEnd = ctx.freshName("localEnd")
    -    val range = ctx.freshName("range")
         val shouldStop = if (parent.needStopCheck) {
    -      s"if (shouldStop()) { $number = $value + ${step}L; return; }"
    +      s"if (shouldStop()) { $nextIndex = $value + ${step}L; return; }"
         } else {
           "// shouldStop check is eliminated"
         }
    +    val loopCondition = if (limitNotReachedChecks.isEmpty) {
    +      "true"
    +    } else {
    +      limitNotReachedChecks.mkString(" && ")
    --- End diff --
    
    This is whole-stage-codege. If bytecode overfolow happens, we will fallback


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3715/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223044989
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -452,46 +452,68 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
     
         val localIdx = ctx.freshName("localIdx")
         val localEnd = ctx.freshName("localEnd")
    -    val range = ctx.freshName("range")
         val shouldStop = if (parent.needStopCheck) {
    -      s"if (shouldStop()) { $number = $value + ${step}L; return; }"
    +      s"if (shouldStop()) { $nextIndex = $value + ${step}L; return; }"
    --- End diff --
    
    I am not sure why you need a benchmark for this (unless you did something different from what I have suggested in the earlier comment). In that case it is a single metric update which happens only when stopping, it shouldn't introduce any significant overhead. Am I missing something? Anyway let's move the discussion to the next PR then, thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223022650
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala ---
    @@ -345,6 +345,27 @@ trait CodegenSupport extends SparkPlan {
        * don't require shouldStop() in the loop of producing rows.
        */
       def needStopCheck: Boolean = parent.needStopCheck
    +
    +  /**
    +   * A sequence of checks which evaluate to true if the downstream Limit operators have not received
    +   * enough records and reached the limit. If current node is a data producing node, it can leverage
    +   * this information to stop producing data and complete the data flow earlier. Common data
    +   * producing nodes are leaf nodes like Range and Scan, and blocking nodes like Sort and Aggregate.
    +   * These checks should be put into the loop condition of the data producing loop.
    +   */
    +  def limitNotReachedChecks: Seq[String] = parent.limitNotReachedChecks
    +
    +  /**
    +   * A helper method to generate the data producing loop condition according to the
    +   * limit-not-reached checks.
    +   */
    +  final def limitNotReachedCond: String = {
    +    if (parent.limitNotReachedChecks.isEmpty) {
    +      ""
    +    } else {
    +      parent.limitNotReachedChecks.mkString(" && ", " && ", "")
    --- End diff --
    
    then we will have a lot of places generating the initial `&&`. If we do have a different context in the future, we can use `limitNotReachedChecks` directly.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97099/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #97099 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97099/testReport)** for PR 22630 at commit [`d815c0b`](https://github.com/apache/spark/commit/d815c0bffc7f23325c256d436c243af94cb2a228).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `trait BlockingOperatorWithCodegen extends CodegenSupport `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223025094
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -66,27 +75,22 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
       // to the parent operator.
       override def usedInputs: AttributeSet = AttributeSet.empty
     
    +  private lazy val countTerm = BaseLimitExec.newLimitCountTerm()
    +
    +  override lazy val limitNotReachedChecks: Seq[String] = {
    +    s"$countTerm < $limit" +: super.limitNotReachedChecks
    +  }
    +
       protected override def doProduce(ctx: CodegenContext): String = {
         child.asInstanceOf[CodegenSupport].produce(ctx, this)
       }
     
       override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
    -    val stopEarly =
    -      ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "stopEarly") // init as stopEarly = false
    -
    -    ctx.addNewFunction("stopEarly", s"""
    -      @Override
    -      protected boolean stopEarly() {
    -        return $stopEarly;
    -      }
    -    """, inlineToOuterClass = true)
    -    val countTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, "count") // init as count = 0
    +    ctx.addMutableState(CodeGenerator.JAVA_INT, countTerm, forceInline = true, useFreshName = false)
    --- End diff --
    
    because the counter variable name is decided before we obtain the `CodegenContext`. If we don't inline here, we need a way to notify the upstream operators about the counter name, which is hard to do.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #96996 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96996/testReport)** for PR 22630 at commit [`e0bc621`](https://github.com/apache/spark/commit/e0bc621499021855a68a4c4e318506b16b52dd0d).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3684/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223478828
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -452,46 +452,68 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
     
         val localIdx = ctx.freshName("localIdx")
         val localEnd = ctx.freshName("localEnd")
    -    val range = ctx.freshName("range")
         val shouldStop = if (parent.needStopCheck) {
    -      s"if (shouldStop()) { $number = $value + ${step}L; return; }"
    +      s"if (shouldStop()) { $nextIndex = $value + ${step}L; return; }"
    --- End diff --
    
    Sorry for late comment. It would be good to discuss detail in another PR.
    
    At first, I agree with necessary of benchmarking. 
    1. I think that `localIdx` can be defined as local variable outside of the loop. Or, how about storing `localIdx` to another local variable only if `parent.needStopCheck` is `true`.
    1. Since `shouldStop()` is simply without updating, we expect the JIT applies inlining and some optimizations.
    1. If we want to call `incRecordRead`, it would be good to exit a loop using `break` and then call `incRecordRead`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    LGTM


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #96996 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96996/testReport)** for PR 22630 at commit [`e0bc621`](https://github.com/apache/spark/commit/e0bc621499021855a68a4c4e318506b16b52dd0d).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r222977394
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -66,27 +75,22 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
       // to the parent operator.
       override def usedInputs: AttributeSet = AttributeSet.empty
     
    +  private lazy val countTerm = BaseLimitExec.newLimitCountTerm()
    +
    +  override lazy val limitNotReachedChecks: Seq[String] = {
    +    s"$countTerm < $limit" +: super.limitNotReachedChecks
    +  }
    +
       protected override def doProduce(ctx: CodegenContext): String = {
         child.asInstanceOf[CodegenSupport].produce(ctx, this)
       }
     
       override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
    -    val stopEarly =
    -      ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "stopEarly") // init as stopEarly = false
    -
    -    ctx.addNewFunction("stopEarly", s"""
    -      @Override
    -      protected boolean stopEarly() {
    -        return $stopEarly;
    -      }
    -    """, inlineToOuterClass = true)
    -    val countTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, "count") // init as count = 0
    +    ctx.addMutableState(CodeGenerator.JAVA_INT, countTerm, forceInline = true, useFreshName = false)
    --- End diff --
    
    why do we need to `forceInline`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223474853
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -452,46 +452,73 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
     
         val localIdx = ctx.freshName("localIdx")
         val localEnd = ctx.freshName("localEnd")
    -    val range = ctx.freshName("range")
         val shouldStop = if (parent.needStopCheck) {
    -      s"if (shouldStop()) { $number = $value + ${step}L; return; }"
    +      s"if (shouldStop()) { $nextIndex = $value + ${step}L; return; }"
         } else {
           "// shouldStop check is eliminated"
         }
    +    val loopCondition = if (limitNotReachedChecks.isEmpty) {
    +      "true"
    +    } else {
    +      limitNotReachedChecks.mkString(" && ")
    --- End diff --
    
    nit: I am a bit affraid about 64KB Java bytecode overflow by using `mkString`. On the other hand, I understand that this condition generation is performance sensitive.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #96985 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96985/testReport)** for PR 22630 at commit [`51ce7be`](https://github.com/apache/spark/commit/51ce7be89de2a942508a939c20658b80ece9fe56).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    This is an interesting change. I like this idea.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r222855008
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala ---
    @@ -345,6 +345,16 @@ trait CodegenSupport extends SparkPlan {
        * don't require shouldStop() in the loop of producing rows.
        */
       def needStopCheck: Boolean = parent.needStopCheck
    +
    +  def conditionsOfKeepProducingData: Seq[String] = parent.conditionsOfKeepProducingData
    --- End diff --
    
    Can we have described simply what this two method are here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r222728210
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---
    @@ -159,6 +159,10 @@ case class HashAggregateExec(
       // don't need a stop check before aggregating.
       override def needStopCheck: Boolean = false
     
    +  // Aggregate operator always consumes all the input rows before outputting any result, so its
    +  // upstream operators can keep producing data, even if there is a limit after Aggregate.
    --- End diff --
    
    I have not looked at this clearly. But if there is limit before Aggregate? We should not consume all input rows.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223468018
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
    @@ -518,56 +521,81 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
         testMetricsDynamicPartition("parquet", "parquet", "t1")
       }
     
    +  private def collectNodeWithinWholeStage[T <: SparkPlan : ClassTag](plan: SparkPlan): Seq[T] = {
    +    val stages = plan.collect {
    +      case w: WholeStageCodegenExec => w
    +    }
    +    assert(stages.length == 1, "The query plan should have one and only one whole-stage.")
    +    stages.head
    --- End diff --
    
    nit: Do we need this line?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r222991883
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -46,6 +46,15 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode
       }
     }
     
    +object BaseLimitExec {
    +  private val curId = new java.util.concurrent.atomic.AtomicInteger()
    +
    +  def newLimitCountTerm(): String = {
    +    val id = curId.getAndIncrement()
    +    s"_limit_counter_$id"
    +  }
    --- End diff --
    
    Can't we use `freshName`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #96983 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96983/testReport)** for PR 22630 at commit [`7404fe9`](https://github.com/apache/spark/commit/7404fe9fc100ccd9fe141727ba058908372de0aa).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r222979315
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -452,46 +452,68 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
     
         val localIdx = ctx.freshName("localIdx")
         val localEnd = ctx.freshName("localEnd")
    -    val range = ctx.freshName("range")
         val shouldStop = if (parent.needStopCheck) {
    -      s"if (shouldStop()) { $number = $value + ${step}L; return; }"
    +      s"if (shouldStop()) { $nextIndex = $value + ${step}L; return; }"
    --- End diff --
    
    in this case we are not very accurate in the metrics right? I mean we always say that we are returning a full batch, even though we have consumed less rows than a batch.
    
    What about updating the metrics before returning? Something like `$inputMetrics.incRecordsRead($localIdx - $localEnd);`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #97106 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97106/testReport)** for PR 22630 at commit [`e61078b`](https://github.com/apache/spark/commit/e61078bb563c6868be02f256041a0fb5fbd7c7d7).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223202962
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -452,46 +452,68 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
     
         val localIdx = ctx.freshName("localIdx")
         val localEnd = ctx.freshName("localEnd")
    -    val range = ctx.freshName("range")
         val shouldStop = if (parent.needStopCheck) {
    -      s"if (shouldStop()) { $number = $value + ${step}L; return; }"
    +      s"if (shouldStop()) { $nextIndex = $value + ${step}L; return; }"
    --- End diff --
    
    ok, let's get back to this eventually later, this is anyway not worse than before.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96996/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3787/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96985/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #96985 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96985/testReport)** for PR 22630 at commit [`51ce7be`](https://github.com/apache/spark/commit/51ce7be89de2a942508a939c20658b80ece9fe56).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223052388
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala ---
    @@ -345,6 +345,27 @@ trait CodegenSupport extends SparkPlan {
        * don't require shouldStop() in the loop of producing rows.
        */
       def needStopCheck: Boolean = parent.needStopCheck
    +
    +  /**
    +   * A sequence of checks which evaluate to true if the downstream Limit operators have not received
    +   * enough records and reached the limit. If current node is a data producing node, it can leverage
    +   * this information to stop producing data and complete the data flow earlier. Common data
    +   * producing nodes are leaf nodes like Range and Scan, and blocking nodes like Sort and Aggregate.
    +   * These checks should be put into the loop condition of the data producing loop.
    +   */
    +  def limitNotReachedChecks: Seq[String] = parent.limitNotReachedChecks
    +
    +  /**
    +   * A helper method to generate the data producing loop condition according to the
    +   * limit-not-reached checks.
    +   */
    +  final def limitNotReachedCond: String = {
    +    if (parent.limitNotReachedChecks.isEmpty) {
    --- End diff --
    
    Just one thought: since we propagate (correctly) the `limitNotReachedChecks` to all the children, shall we also enforce that we are calling this on a node which will not propagate the `limitNotReachedChecks` anymore? We may use the `blocking` flag proposed in the other comment maybe.
    
    The reason I'd like to do this is to enforce that we are not introducing the same limit condition check more than once, in more than one operator, which would be useless and may cause (small) perf issue. WDYT?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #96948 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96948/testReport)** for PR 22630 at commit [`0a6c79a`](https://github.com/apache/spark/commit/0a6c79a7474bc31344ac7ef63a953d03c6153b45).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #97111 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97111/testReport)** for PR 22630 at commit [`eac31b2`](https://github.com/apache/spark/commit/eac31b26fba4790301e009671e931451baf2f71d).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #97106 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97106/testReport)** for PR 22630 at commit [`e61078b`](https://github.com/apache/spark/commit/e61078bb563c6868be02f256041a0fb5fbd7c7d7).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `trait BlockingOperatorWithCodegen extends CodegenSupport `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3716/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r222958800
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---
    @@ -705,13 +712,16 @@ case class HashAggregateExec(
     
         def outputFromRegularHashMap: String = {
           s"""
    -         |while ($iterTerm.next()) {
    +         |while ($iterTerm.next()$limitNotReachedCond) {
              |  UnsafeRow $keyTerm = (UnsafeRow) $iterTerm.getKey();
              |  UnsafeRow $bufferTerm = (UnsafeRow) $iterTerm.getValue();
              |  $outputFunc($keyTerm, $bufferTerm);
    -         |
              |  if (shouldStop()) return;
              |}
    +         |$iterTerm.close();
    --- End diff --
    
    this is an unrelated change, right? It changes nothing in the generated code, right? just want to double-check I am not missing something (what changes is that before we were not doing the cleanup in case of limit operator, instead now we do, I see this).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r222975038
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -452,46 +452,68 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
     
         val localIdx = ctx.freshName("localIdx")
         val localEnd = ctx.freshName("localEnd")
    -    val range = ctx.freshName("range")
         val shouldStop = if (parent.needStopCheck) {
    -      s"if (shouldStop()) { $number = $value + ${step}L; return; }"
    +      s"if (shouldStop()) { $nextIndex = $value + ${step}L; return; }"
         } else {
           "// shouldStop check is eliminated"
         }
    +
    +    // An overview of the Range processing.
    +    //
    +    // For each partition, the Range task needs to produce records from partition start(inclusive)
    +    // to end(exclusive). For better performance, we separate the partition range into batches, and
    +    // use 2 loops to produce data. The outer while loop is used to iterate batches, and the inner
    +    // for loop is used to iterate records inside a batch.
    +    //
    +    // `nextIndex` tracks the index of the next record that is going to be consumed, initialized
    +    // with partition start. `batchEnd` tracks the end index of the current batch, initialized
    +    // with `nextIndex`. In the outer loop, we first check if `nextIndex == batchEnd`. If it's true,
    +    // it means the current batch is fully consumed, and we will update `batchEnd` to process the
    +    // next batch. If `batchEnd` reaches partition end, exit the outer loop. finally we enter the
    --- End diff --
    
    Capital case for `finally`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223203008
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala ---
    @@ -345,6 +345,27 @@ trait CodegenSupport extends SparkPlan {
        * don't require shouldStop() in the loop of producing rows.
        */
       def needStopCheck: Boolean = parent.needStopCheck
    +
    +  /**
    +   * A sequence of checks which evaluate to true if the downstream Limit operators have not received
    +   * enough records and reached the limit. If current node is a data producing node, it can leverage
    +   * this information to stop producing data and complete the data flow earlier. Common data
    +   * producing nodes are leaf nodes like Range and Scan, and blocking nodes like Sort and Aggregate.
    +   * These checks should be put into the loop condition of the data producing loop.
    +   */
    +  def limitNotReachedChecks: Seq[String] = parent.limitNotReachedChecks
    +
    +  /**
    +   * A helper method to generate the data producing loop condition according to the
    +   * limit-not-reached checks.
    +   */
    +  final def limitNotReachedCond: String = {
    +    if (parent.limitNotReachedChecks.isEmpty) {
    --- End diff --
    
    >  I want to have a simple and robust framework
    
    yes, I 100%, that's why I'd like to early detect all the possible situations which we are not thinking as possible but may happen in corner cases we are not considering. What I am suggesting here is to enforce and fail that for testing only of course, in production we shouldn't do anything similar.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3706/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #97101 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97101/testReport)** for PR 22630 at commit [`e61078b`](https://github.com/apache/spark/commit/e61078bb563c6868be02f256041a0fb5fbd7c7d7).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r222728236
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -66,27 +75,22 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
       // to the parent operator.
       override def usedInputs: AttributeSet = AttributeSet.empty
     
    +  private lazy val countTerm = BaseLimitExec.newLimitCountTerm()
    +
    +  override lazy val conditionsOfKeepProducingData: Seq[String] = {
    +    s"$countTerm < $limit" +: super.conditionsOfKeepProducingData
    --- End diff --
    
    note that this is sub-optimal for adjacent limits, but I think it's fine as optimizer will merge adjacent limits.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223053247
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -452,46 +452,68 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
     
         val localIdx = ctx.freshName("localIdx")
         val localEnd = ctx.freshName("localEnd")
    -    val range = ctx.freshName("range")
         val shouldStop = if (parent.needStopCheck) {
    -      s"if (shouldStop()) { $number = $value + ${step}L; return; }"
    +      s"if (shouldStop()) { $nextIndex = $value + ${step}L; return; }"
    --- End diff --
    
    but `shouldStop` is called local to the loop, isn't it?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223024194
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
    @@ -46,6 +46,15 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode
       }
     }
     
    +object BaseLimitExec {
    +  private val curId = new java.util.concurrent.atomic.AtomicInteger()
    +
    +  def newLimitCountTerm(): String = {
    +    val id = curId.getAndIncrement()
    +    s"_limit_counter_$id"
    +  }
    --- End diff --
    
    there is no `CodegenContext` here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    LGTM apart the minor comments which we can address also later


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223182488
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala ---
    @@ -345,6 +345,27 @@ trait CodegenSupport extends SparkPlan {
        * don't require shouldStop() in the loop of producing rows.
        */
       def needStopCheck: Boolean = parent.needStopCheck
    +
    +  /**
    +   * A sequence of checks which evaluate to true if the downstream Limit operators have not received
    +   * enough records and reached the limit. If current node is a data producing node, it can leverage
    +   * this information to stop producing data and complete the data flow earlier. Common data
    +   * producing nodes are leaf nodes like Range and Scan, and blocking nodes like Sort and Aggregate.
    +   * These checks should be put into the loop condition of the data producing loop.
    +   */
    +  def limitNotReachedChecks: Seq[String] = parent.limitNotReachedChecks
    +
    +  /**
    +   * A helper method to generate the data producing loop condition according to the
    +   * limit-not-reached checks.
    +   */
    +  final def limitNotReachedCond: String = {
    +    if (parent.limitNotReachedChecks.isEmpty) {
    --- End diff --
    
    It's not very useful to enforce that. The consequence is so minor and I don't think it's worth the complexity. I want to have a simple and robust framework for the limit optimization first.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r222977756
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
    @@ -518,56 +521,81 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
         testMetricsDynamicPartition("parquet", "parquet", "t1")
       }
     
    +  private def collectNodeWithinWholeStage[T <: SparkPlan : ClassTag](plan: SparkPlan): Seq[T] = {
    +    val stages = plan.collect {
    --- End diff --
    
    `collectFirst`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97101/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #96997 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96997/testReport)** for PR 22630 at commit [`dc2dfa5`](https://github.com/apache/spark/commit/dc2dfa5b42bcd597a0929aca16152e260c7f5122).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #96997 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96997/testReport)** for PR 22630 at commit [`dc2dfa5`](https://github.com/apache/spark/commit/dc2dfa5b42bcd597a0929aca16152e260c7f5122).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96986/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223002050
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala ---
    @@ -166,7 +166,7 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport {
         }
         val inputRow = if (needsUnsafeRowConversion) null else row
         s"""
    -       |while ($input.hasNext()) {
    +       |while ($input.hasNext()$limitNotReachedCond) {
    --- End diff --
    
    We can put `limitNotReachedCond` as first condition to avoid possible buffering of row.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96984/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223277348
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala ---
    @@ -360,6 +360,10 @@ trait CodegenSupport extends SparkPlan {
        * limit-not-reached checks.
        */
       final def limitNotReachedCond: String = {
    +    // InputAdapter is also a leaf node.
    +    val isLeafNode = children.isEmpty || this.isInstanceOf[InputAdapter]
    +    assert(isLeafNode || this.isInstanceOf[BlockingOperatorWithCodegen],
    --- End diff --
    
    nit: shall we do this only if `Utils.isTesting` and otherwise just emit a warning maybe?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #96944 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96944/testReport)** for PR 22630 at commit [`d9b54d5`](https://github.com/apache/spark/commit/d9b54d5c6edd4f5337efb2d185dbb58f33972616).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r222733663
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---
    @@ -705,13 +709,16 @@ case class HashAggregateExec(
     
         def outputFromRegularHashMap: String = {
           s"""
    -         |while ($iterTerm.next()) {
    +         |while ($iterTerm.next()$keepProducingDataCond) {
    --- End diff --
    
    Here I only add the stop check for regular hash map. The fast hash map is small and all in memory, it's ok to always output all of it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96948/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3705/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223182235
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala ---
    @@ -132,6 +132,13 @@ case class SortExec(
       // a stop check before sorting.
       override def needStopCheck: Boolean = false
     
    +  // Sort is a blocking operator. It needs to consume all the inputs before producing any output.
    +  // This means, Limit operator after Sort will never reach its limit during the execution of Sort's
    +  // upstream operators. Here we override this method to return Nil, so that upstream operators will
    +  // not generate useless conditions (which are always evaluated to false) for the Limit operators
    +  // after Sort.
    +  override def limitNotReachedChecks: Seq[String] = Nil
    --- End diff --
    
    It's only done in Sort and Aggregate currently. I don't want to overdesign it until there are more use cases.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3806/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223182600
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -452,46 +452,68 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
     
         val localIdx = ctx.freshName("localIdx")
         val localEnd = ctx.freshName("localEnd")
    -    val range = ctx.freshName("range")
         val shouldStop = if (parent.needStopCheck) {
    -      s"if (shouldStop()) { $number = $value + ${step}L; return; }"
    +      s"if (shouldStop()) { $nextIndex = $value + ${step}L; return; }"
    --- End diff --
    
    `shouldStop` is called local, but metrics updating is not.
    
    Anyway, JVM JIT is mysterious and we need to be super careful when updating this kind of hot loops. That said, I'm not confident of any changes to the hot loop without a benchmark.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96983/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r222733405
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---
    @@ -159,6 +159,10 @@ case class HashAggregateExec(
       // don't need a stop check before aggregating.
       override def needStopCheck: Boolean = false
     
    +  // Aggregate operator always consumes all the input rows before outputting any result, so its
    +  // upstream operators can keep producing data, even if there is a limit after Aggregate.
    --- End diff --
    
    let's say the query is `range -> limit -> agg -> limit`.
    
    So `agg` does consume all the inputs, from the first `limit`. The range will have a stop check w.r.t. to first `limit`, not the second `limit`. If there is no limit before `agg`, then `range` will not have a stop check.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3683/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #96947 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96947/testReport)** for PR 22630 at commit [`13d882a`](https://github.com/apache/spark/commit/13d882a0e1cdd7c09e31e5353297e1dcbdf7f876).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223023197
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---
    @@ -705,13 +712,16 @@ case class HashAggregateExec(
     
         def outputFromRegularHashMap: String = {
           s"""
    -         |while ($iterTerm.next()) {
    +         |while ($iterTerm.next()$limitNotReachedCond) {
              |  UnsafeRow $keyTerm = (UnsafeRow) $iterTerm.getKey();
              |  UnsafeRow $bufferTerm = (UnsafeRow) $iterTerm.getValue();
              |  $outputFunc($keyTerm, $bufferTerm);
    -         |
              |  if (shouldStop()) return;
              |}
    +         |$iterTerm.close();
    --- End diff --
    
    Yes it's unrelated and is a noop. `outputFromRowBasedMap` and `outputFromVectorizedMap` put the resource closing at the end, I want to be consistent here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223050740
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala ---
    @@ -132,6 +132,13 @@ case class SortExec(
       // a stop check before sorting.
       override def needStopCheck: Boolean = false
     
    +  // Sort is a blocking operator. It needs to consume all the inputs before producing any output.
    +  // This means, Limit operator after Sort will never reach its limit during the execution of Sort's
    +  // upstream operators. Here we override this method to return Nil, so that upstream operators will
    +  // not generate useless conditions (which are always evaluated to false) for the Limit operators
    +  // after Sort.
    +  override def limitNotReachedChecks: Seq[String] = Nil
    --- End diff --
    
    it seems that all blocking operators will have this behavior. Shall we rather have a `blockingOperator`  flag def and make this a final function incorporating this logic there?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #96986 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96986/testReport)** for PR 22630 at commit [`2188b27`](https://github.com/apache/spark/commit/2188b2737b1de4b401d82012bcc15c748cb164ad).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #96947 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96947/testReport)** for PR 22630 at commit [`13d882a`](https://github.com/apache/spark/commit/13d882a0e1cdd7c09e31e5353297e1dcbdf7f876).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    LGTM


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3680/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96947/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3785/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223474906
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala ---
    @@ -345,6 +345,61 @@ trait CodegenSupport extends SparkPlan {
        * don't require shouldStop() in the loop of producing rows.
        */
       def needStopCheck: Boolean = parent.needStopCheck
    +
    +  /**
    +   * A sequence of checks which evaluate to true if the downstream Limit operators have not received
    +   * enough records and reached the limit. If current node is a data producing node, it can leverage
    +   * this information to stop producing data and complete the data flow earlier. Common data
    +   * producing nodes are leaf nodes like Range and Scan, and blocking nodes like Sort and Aggregate.
    +   * These checks should be put into the loop condition of the data producing loop.
    +   */
    +  def limitNotReachedChecks: Seq[String] = parent.limitNotReachedChecks
    +
    +  /**
    +   * A helper method to generate the data producing loop condition according to the
    +   * limit-not-reached checks.
    +   */
    +  final def limitNotReachedCond: String = {
    +    // InputAdapter is also a leaf node.
    +    val isLeafNode = children.isEmpty || this.isInstanceOf[InputAdapter]
    +    if (!isLeafNode && !this.isInstanceOf[BlockingOperatorWithCodegen]) {
    +      val errMsg = "Only leaf nodes and blocking nodes need to call 'limitNotReachedCond' " +
    +        "in its data producing loop."
    +      if (Utils.isTesting) {
    +        throw new IllegalStateException(errMsg)
    +      } else {
    +        logWarning(s"[BUG] $errMsg Please open a JIRA ticket to report it.")
    +      }
    +    }
    +    if (parent.limitNotReachedChecks.isEmpty) {
    +      ""
    +    } else {
    +      parent.limitNotReachedChecks.mkString("", " && ", " &&")
    --- End diff --
    
    nit: I am a bit affraid about 64KB Java bytecode overflow by using `mkString`. On the other hand, I understand that this condition generation is performance sensitive.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3792/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #97111 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97111/testReport)** for PR 22630 at commit [`eac31b2`](https://github.com/apache/spark/commit/eac31b26fba4790301e009671e931451baf2f71d).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96944/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96997/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Thanks! merging to master


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97111/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #96948 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96948/testReport)** for PR 22630 at commit [`0a6c79a`](https://github.com/apache/spark/commit/0a6c79a7474bc31344ac7ef63a953d03c6153b45).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #97101 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97101/testReport)** for PR 22630 at commit [`e61078b`](https://github.com/apache/spark/commit/e61078bb563c6868be02f256041a0fb5fbd7c7d7).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `trait BlockingOperatorWithCodegen extends CodegenSupport `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #96984 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96984/testReport)** for PR 22630 at commit [`a31f601`](https://github.com/apache/spark/commit/a31f601bedeb783fbc54b6f44f4c693a175fca11).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3708/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    **[Test build #96983 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96983/testReport)** for PR 22630 at commit [`7404fe9`](https://github.com/apache/spark/commit/7404fe9fc100ccd9fe141727ba058908372de0aa).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223319524
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala ---
    @@ -345,6 +345,61 @@ trait CodegenSupport extends SparkPlan {
        * don't require shouldStop() in the loop of producing rows.
        */
       def needStopCheck: Boolean = parent.needStopCheck
    +
    +  /**
    +   * A sequence of checks which evaluate to true if the downstream Limit operators have not received
    +   * enough records and reached the limit. If current node is a data producing node, it can leverage
    +   * this information to stop producing data and complete the data flow earlier. Common data
    +   * producing nodes are leaf nodes like Range and Scan, and blocking nodes like Sort and Aggregate.
    +   * These checks should be put into the loop condition of the data producing loop.
    +   */
    +  def limitNotReachedChecks: Seq[String] = parent.limitNotReachedChecks
    +
    +  /**
    +   * A helper method to generate the data producing loop condition according to the
    +   * limit-not-reached checks.
    +   */
    +  final def limitNotReachedCond: String = {
    +    // InputAdapter is also a leaf node.
    +    val isLeafNode = children.isEmpty || this.isInstanceOf[InputAdapter]
    +    if (isLeafNode || this.isInstanceOf[BlockingOperatorWithCodegen]) {
    +      val errMsg = "only leaf nodes and blocking nodes need to call 'limitNotReachedCond' " +
    --- End diff --
    
    nit: `Only`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r223049448
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -452,46 +452,68 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
     
         val localIdx = ctx.freshName("localIdx")
         val localEnd = ctx.freshName("localEnd")
    -    val range = ctx.freshName("range")
         val shouldStop = if (parent.needStopCheck) {
    -      s"if (shouldStop()) { $number = $value + ${step}L; return; }"
    +      s"if (shouldStop()) { $nextIndex = $value + ${step}L; return; }"
    --- End diff --
    
    > Something like `$inputMetrics.incRecordsRead($localIdx - $localEnd);`?
    
    `localIdx` is purely local to the loop, if we access it outside of the loop, we need to define `localIdx` outside of loop as well. This may have some performance penalty. cc @kiszk 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22630: [SPARK-25497][SQL] Limit operation within whole stage co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97106/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22630: [SPARK-25497][SQL] Limit operation within whole s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22630#discussion_r222727599
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
    @@ -518,56 +521,81 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
         testMetricsDynamicPartition("parquet", "parquet", "t1")
       }
     
    +  private def collectNodeWithinWholeStage[T <: SparkPlan : ClassTag](plan: SparkPlan): Seq[T] = {
    +    val stages = plan.collect {
    +      case w: WholeStageCodegenExec => w
    +    }
    +    assert(stages.length == 1, "The query plan should have one and only one whole-stage.")
    +    stages.head
    +
    +    val cls = classTag[T].runtimeClass
    +    stages.head.collect {
    +      case n if n.getClass == cls => n.asInstanceOf[T]
    +    }
    +  }
    +
       test("SPARK-25602: SparkPlan.getByteArrayRdd should not consume the input when not necessary") {
         def checkFilterAndRangeMetrics(
             df: DataFrame,
             filterNumOutputs: Int,
             rangeNumOutputs: Int): Unit = {
    -      var filter: FilterExec = null
    -      var range: RangeExec = null
    -      val collectFilterAndRange: SparkPlan => Unit = {
    -        case f: FilterExec =>
    -          assert(filter == null, "the query should only have one Filter")
    -          filter = f
    -        case r: RangeExec =>
    -          assert(range == null, "the query should only have one Range")
    -          range = r
    -        case _ =>
    -      }
    -      if (SQLConf.get.wholeStageEnabled) {
    -        df.queryExecution.executedPlan.foreach {
    -          case w: WholeStageCodegenExec =>
    -            w.child.foreach(collectFilterAndRange)
    -          case _ =>
    -        }
    -      } else {
    -        df.queryExecution.executedPlan.foreach(collectFilterAndRange)
    -      }
    +      val plan = df.queryExecution.executedPlan
     
    -      assert(filter != null && range != null, "the query doesn't have Filter and Range")
    -      assert(filter.metrics("numOutputRows").value == filterNumOutputs)
    -      assert(range.metrics("numOutputRows").value == rangeNumOutputs)
    +      val filters = collectNodeWithinWholeStage[FilterExec](plan)
    +      assert(filters.length == 1, "The query plan should have one and only one Filter")
    +      assert(filters.head.metrics("numOutputRows").value == filterNumOutputs)
    +
    +      val ranges = collectNodeWithinWholeStage[RangeExec](plan)
    +      assert(ranges.length == 1, "The query plan should have one and only one Range")
    +      assert(ranges.head.metrics("numOutputRows").value == rangeNumOutputs)
         }
     
    -    val df = spark.range(0, 3000, 1, 2).toDF().filter('id % 3 === 0)
    -    val df2 = df.limit(2)
    -    Seq(true, false).foreach { wholeStageEnabled =>
    -      withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> wholeStageEnabled.toString) {
    -        df.collect()
    -        checkFilterAndRangeMetrics(df, filterNumOutputs = 1000, rangeNumOutputs = 3000)
    -
    -        df.queryExecution.executedPlan.foreach(_.resetMetrics())
    -        // For each partition, we get 2 rows. Then the Filter should produce 2 rows per-partition,
    -        // and Range should produce 1000 rows (one batch) per-partition. Totally Filter produces
    -        // 4 rows, and Range produces 2000 rows.
    -        df.queryExecution.toRdd.mapPartitions(_.take(2)).collect()
    -        checkFilterAndRangeMetrics(df, filterNumOutputs = 4, rangeNumOutputs = 2000)
    -
    -        // Top-most limit will call `CollectLimitExec.executeCollect`, which will only run the first
    -        // task, so totally the Filter produces 2 rows, and Range produces 1000 rows (one batch).
    -        df2.collect()
    -        checkFilterAndRangeMetrics(df2, filterNumOutputs = 2, rangeNumOutputs = 1000)
    -      }
    +    withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
    --- End diff --
    
    I change the test to check whole-stage mode only. The metrics is different between whole-stage and normal mode, and the bug was only in whole-stage.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org