You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by cloud-fan <gi...@git.apache.org> on 2018/10/03 13:52:33 UTC

[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...

GitHub user cloud-fan opened a pull request:

    https://github.com/apache/spark/pull/22621

    [SPARK-25602][SQL] range metrics can be wrong if the result rows are not fully consumed

    ## What changes were proposed in this pull request?
    
    This is a long-standing bug. When `Range` is whole stage codegened, it updates metrics before producing records of each batch. However, when producing records of a batch, the loop can be interrupted and then the metrics can be wrong.
    
    To fix this bug, this PR proposes to update `Range` metrics after a batch(or part of it if the loop is interrupted) is consumed.
    
    Since the bug is only about metrics, and the fix is non-trivial, and it's not a regression for 2.4, I'm targeting this PR to master only.
    
    ## How was this patch tested?
    
    new tests


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/cloud-fan/spark range

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22621.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22621
    
----
commit 01c1738b934ea79f2ee54fde884501140b9854e4
Author: Wenchen Fan <we...@...>
Date:   2018-10-03T05:22:07Z

    range metrics can be wrong if the result rows are not fully consumed

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] range metrics can be wrong if the res...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] range metrics can be wrong if the res...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3669/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] range metrics can be wrong if the res...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    **[Test build #96895 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96895/testReport)** for PR 22621 at commit [`454efab`](https://github.com/apache/spark/commit/454efab90098b7ea62803e9237290a7bb2da87a0).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22621#discussion_r222341169
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -453,45 +453,89 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
         val localIdx = ctx.freshName("localIdx")
         val localEnd = ctx.freshName("localEnd")
         val range = ctx.freshName("range")
    -    val shouldStop = if (parent.needStopCheck) {
    -      s"if (shouldStop()) { $number = $value + ${step}L; return; }"
    +
    +    val processingLoop = if (parent.needStopCheck) {
    +      // TODO (cloud-fan): do we really need to do the stop check within batch?
    --- End diff --
    
    we just buffer more rows in `BufferRowIterator.currentRows`, it's only about performance IIUC.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    Let's say, this can be behaivour changes too since metrics are now changed. Should we update migration guide for safety?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22621#discussion_r222351322
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -453,45 +453,89 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
         val localIdx = ctx.freshName("localIdx")
         val localEnd = ctx.freshName("localEnd")
         val range = ctx.freshName("range")
    -    val shouldStop = if (parent.needStopCheck) {
    -      s"if (shouldStop()) { $number = $value + ${step}L; return; }"
    +
    +    val processingLoop = if (parent.needStopCheck) {
    +      // TODO (cloud-fan): do we really need to do the stop check within batch?
    --- End diff --
    
    I think that there is BroadcastHashJoin case doesn't mean it is generally ok to buffer more rows. If it is possible, we still should avoid it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96924/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    thanks, merging to master/2.4!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3672/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22621#discussion_r222342196
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -453,45 +453,89 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
         val localIdx = ctx.freshName("localIdx")
         val localEnd = ctx.freshName("localEnd")
         val range = ctx.freshName("range")
    -    val shouldStop = if (parent.needStopCheck) {
    -      s"if (shouldStop()) { $number = $value + ${step}L; return; }"
    +
    +    val processingLoop = if (parent.needStopCheck) {
    +      // TODO (cloud-fan): do we really need to do the stop check within batch?
    +      s"""
    +         |int $localIdx = 0;
    +         |for (; $localIdx < $localEnd && !shouldStop(); $localIdx++) {
    +         |  long $value = $nextIndex;
    +         |  ${consume(ctx, Seq(ev))}
    +         |  $nextIndex += ${step}L;
    +         |}
    +         |$numOutput.add($localIdx);
    +         |$inputMetrics.incRecordsRead($localIdx);
    +       """.stripMargin
    +    } else {
    +      s"""
    +         |for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
    +         |  long $value = ((long)$localIdx * ${step}L) + $nextIndex;
    +         |  ${consume(ctx, Seq(ev))}
    +         |}
    +         |$nextIndex = $batchEnd;
    +         |$numOutput.add($localEnd);
    +         |$inputMetrics.incRecordsRead($localEnd);
    +       """.stripMargin
    +    }
    +
    +    val loopCondition = if (parent.needStopCheck) {
    +      "!shouldStop()"
         } else {
    -      "// shouldStop check is eliminated"
    +      "true"
         }
    +
    +    // An overview of the Range processing.
    +    //
    +    // For each partition, the Range task needs to produce records from partition start(inclusive)
    +    // to end(exclusive). For better performance, we separate the partition range into batches, and
    +    // use 2 loops to produce data. The outer while loop is used to iterate batches, and the inner
    +    // for loop is used to iterate records inside a batch.
    +    //
    +    // `nextIndex` tracks the index of the next record that is going to be consumed, initialized
    +    // with partition start. `batchEnd` tracks the end index of the current batch, initialized
    +    // with `nextIndex`. In the outer loop, we first check if `batchEnd - nextIndex` is non-zero.
    +    // Note that it can be negative, because range step can be negative. If `batchEnd - nextIndex`
    +    // is non-zero, we enter the inner loop. Otherwise, we update `batchEnd` to process the next
    +    // batch. If `batchEnd` reaches partition end, exit the outer loop. Since `batchEnd` is
    +    // initialized with `nextIndex`, the first iteration of outer loop will not enter the inner
    +    // loop but just update the `batchEnd`.
    +    //
    +    // The inner loop iterates from 0 to `localEnd`, which is calculated by
    +    // `(batchEnd - nextIndex) / step`. Since `batchEnd` is increased by `nextBatchTodo * step` in
    +    // the outer loop, and initialized with `nextIndex`, so `batchEnd - nextIndex` is always
    +    // divisible by `step`. The `nextIndex` is increased by `step` during each iteration, and ends
    +    // up being equal to `batchEnd` when the inner loop finishes.
    +    //
    +    // The inner loop can be interrupted, if the query has produced at least one result row, so that
    +    // we don't buffer many result rows and waste memory. It's ok to interrupt the inner loop,
    +    // because `nextIndex` is updated per loop iteration and remembers how far we have processed.
    +
         s"""
           | // initialize Range
           | if (!$initTerm) {
           |   $initTerm = true;
           |   $initRangeFuncName(partitionIndex);
           | }
           |
    -      | while (true) {
    -      |   long $range = $batchEnd - $number;
    +      | while ($loopCondition) {
    +      |   long $range = $batchEnd - $nextIndex;
           |   if ($range != 0L) {
           |     int $localEnd = (int)($range / ${step}L);
    -      |     for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
    -      |       long $value = ((long)$localIdx * ${step}L) + $number;
    -      |       ${consume(ctx, Seq(ev))}
    -      |       $shouldStop
    +      |     $processingLoop
    +      |   } else {
    +      |     long $nextBatchTodo;
    --- End diff --
    
    Now we don't do `return` when we need to stop the loop. Move these lines to else, so that we won't hit this code path when loop is interrupted.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3673/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    **[Test build #96927 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96927/testReport)** for PR 22621 at commit [`3b9b41f`](https://github.com/apache/spark/commit/3b9b41f29e4819d63097645ed81d42b6fcde0b5d).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] range metrics can be wrong if the res...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3657/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    **[Test build #96924 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96924/testReport)** for PR 22621 at commit [`3b9b41f`](https://github.com/apache/spark/commit/3b9b41f29e4819d63097645ed81d42b6fcde0b5d).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    **[Test build #96927 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96927/testReport)** for PR 22621 at commit [`3b9b41f`](https://github.com/apache/spark/commit/3b9b41f29e4819d63097645ed81d42b6fcde0b5d).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] range metrics can be wrong if the res...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    cc @kiszk @viirya @mgaido91 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22621#discussion_r222320450
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
    @@ -250,7 +250,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
           val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
           val bos = new ByteArrayOutputStream()
           val out = new DataOutputStream(codec.compressedOutputStream(bos))
    -      while (iter.hasNext && (n < 0 || count < n)) {
    +      // `iter.hasNext` may produce one row and buffer it, we should only call it when the limit is
    +      // not hit.
    +      while ((n < 0 || count < n) && iter.hasNext) {
    --- End diff --
    
    nice catch this one!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96920/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22621#discussion_r222510592
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -453,45 +453,89 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
         val localIdx = ctx.freshName("localIdx")
         val localEnd = ctx.freshName("localEnd")
         val range = ctx.freshName("range")
    -    val shouldStop = if (parent.needStopCheck) {
    -      s"if (shouldStop()) { $number = $value + ${step}L; return; }"
    +
    +    val processingLoop = if (parent.needStopCheck) {
    +      // TODO (cloud-fan): do we really need to do the stop check within batch?
    --- End diff --
    
    > mmmh, but localIdx would become localEnd then, right? So the UTs you added would fail, or am I missing something?
    
    You can pull my PR and try, the whole stage codegen is a little convoluted. `localIndex` is local and always starts with 0, `localEnd` is decided by the global `nextIndex`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] range metrics can be wrong if the res...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    **[Test build #96920 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96920/testReport)** for PR 22621 at commit [`1c94d13`](https://github.com/apache/spark/commit/1c94d13895f4537111518e5b26075395bcc250b0).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22621#discussion_r222330998
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -453,45 +453,89 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
         val localIdx = ctx.freshName("localIdx")
         val localEnd = ctx.freshName("localEnd")
         val range = ctx.freshName("range")
    -    val shouldStop = if (parent.needStopCheck) {
    -      s"if (shouldStop()) { $number = $value + ${step}L; return; }"
    +
    +    val processingLoop = if (parent.needStopCheck) {
    +      // TODO (cloud-fan): do we really need to do the stop check within batch?
    +      s"""
    +         |int $localIdx = 0;
    +         |for (; $localIdx < $localEnd && !shouldStop(); $localIdx++) {
    +         |  long $value = $nextIndex;
    +         |  ${consume(ctx, Seq(ev))}
    +         |  $nextIndex += ${step}L;
    +         |}
    +         |$numOutput.add($localIdx);
    +         |$inputMetrics.incRecordsRead($localIdx);
    +       """.stripMargin
    +    } else {
    +      s"""
    +         |for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
    +         |  long $value = ((long)$localIdx * ${step}L) + $nextIndex;
    +         |  ${consume(ctx, Seq(ev))}
    +         |}
    +         |$nextIndex = $batchEnd;
    +         |$numOutput.add($localEnd);
    +         |$inputMetrics.incRecordsRead($localEnd);
    +       """.stripMargin
    +    }
    +
    +    val loopCondition = if (parent.needStopCheck) {
    +      "!shouldStop()"
         } else {
    -      "// shouldStop check is eliminated"
    +      "true"
         }
    +
    +    // An overview of the Range processing.
    +    //
    +    // For each partition, the Range task needs to produce records from partition start(inclusive)
    +    // to end(exclusive). For better performance, we separate the partition range into batches, and
    +    // use 2 loops to produce data. The outer while loop is used to iterate batches, and the inner
    +    // for loop is used to iterate records inside a batch.
    +    //
    +    // `nextIndex` tracks the index of the next record that is going to be consumed, initialized
    +    // with partition start. `batchEnd` tracks the end index of the current batch, initialized
    +    // with `nextIndex`. In the outer loop, we first check if `batchEnd - nextIndex` is non-zero.
    +    // Note that it can be negative, because range step can be negative. If `batchEnd - nextIndex`
    +    // is non-zero, we enter the inner loop. Otherwise, we update `batchEnd` to process the next
    +    // batch. If `batchEnd` reaches partition end, exit the outer loop. Since `batchEnd` is
    +    // initialized with `nextIndex`, the first iteration of outer loop will not enter the inner
    +    // loop but just update the `batchEnd`.
    +    //
    +    // The inner loop iterates from 0 to `localEnd`, which is calculated by
    +    // `(batchEnd - nextIndex) / step`. Since `batchEnd` is increased by `nextBatchTodo * step` in
    +    // the outer loop, and initialized with `nextIndex`, so `batchEnd - nextIndex` is always
    +    // divisible by `step`. The `nextIndex` is increased by `step` during each iteration, and ends
    +    // up being equal to `batchEnd` when the inner loop finishes.
    +    //
    +    // The inner loop can be interrupted, if the query has produced at least one result row, so that
    +    // we don't buffer many result rows and waste memory. It's ok to interrupt the inner loop,
    +    // because `nextIndex` is updated per loop iteration and remembers how far we have processed.
    +
         s"""
           | // initialize Range
           | if (!$initTerm) {
           |   $initTerm = true;
           |   $initRangeFuncName(partitionIndex);
           | }
           |
    -      | while (true) {
    -      |   long $range = $batchEnd - $number;
    +      | while ($loopCondition) {
    +      |   long $range = $batchEnd - $nextIndex;
           |   if ($range != 0L) {
           |     int $localEnd = (int)($range / ${step}L);
    -      |     for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
    -      |       long $value = ((long)$localIdx * ${step}L) + $number;
    -      |       ${consume(ctx, Seq(ev))}
    -      |       $shouldStop
    +      |     $processingLoop
    +      |   } else {
    +      |     long $nextBatchTodo;
    --- End diff --
    
    why did you move these lines in the else?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    I simplified this PR to focus on `SparkPlan.getByteArrayRdd` only. Will submit PR to fix range later.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] range metrics can be wrong if the res...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    **[Test build #96895 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96895/testReport)** for PR 22621 at commit [`454efab`](https://github.com/apache/spark/commit/454efab90098b7ea62803e9237290a7bb2da87a0).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    why do we need migration guide for bug fix?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96927/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    That's my point. Why do we have to document for fixing unexpected results fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22621#discussion_r222345983
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -453,45 +453,89 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
         val localIdx = ctx.freshName("localIdx")
         val localEnd = ctx.freshName("localEnd")
         val range = ctx.freshName("range")
    -    val shouldStop = if (parent.needStopCheck) {
    -      s"if (shouldStop()) { $number = $value + ${step}L; return; }"
    +
    +    val processingLoop = if (parent.needStopCheck) {
    +      // TODO (cloud-fan): do we really need to do the stop check within batch?
    +      s"""
    +         |int $localIdx = 0;
    +         |for (; $localIdx < $localEnd && !shouldStop(); $localIdx++) {
    +         |  long $value = $nextIndex;
    +         |  ${consume(ctx, Seq(ev))}
    +         |  $nextIndex += ${step}L;
    +         |}
    +         |$numOutput.add($localIdx);
    +         |$inputMetrics.incRecordsRead($localIdx);
    +       """.stripMargin
    +    } else {
    +      s"""
    +         |for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
    +         |  long $value = ((long)$localIdx * ${step}L) + $nextIndex;
    +         |  ${consume(ctx, Seq(ev))}
    +         |}
    +         |$nextIndex = $batchEnd;
    +         |$numOutput.add($localEnd);
    +         |$inputMetrics.incRecordsRead($localEnd);
    +       """.stripMargin
    +    }
    +
    +    val loopCondition = if (parent.needStopCheck) {
    +      "!shouldStop()"
         } else {
    -      "// shouldStop check is eliminated"
    +      "true"
         }
    +
    +    // An overview of the Range processing.
    +    //
    +    // For each partition, the Range task needs to produce records from partition start(inclusive)
    +    // to end(exclusive). For better performance, we separate the partition range into batches, and
    +    // use 2 loops to produce data. The outer while loop is used to iterate batches, and the inner
    +    // for loop is used to iterate records inside a batch.
    +    //
    +    // `nextIndex` tracks the index of the next record that is going to be consumed, initialized
    +    // with partition start. `batchEnd` tracks the end index of the current batch, initialized
    +    // with `nextIndex`. In the outer loop, we first check if `batchEnd - nextIndex` is non-zero.
    +    // Note that it can be negative, because range step can be negative. If `batchEnd - nextIndex`
    +    // is non-zero, we enter the inner loop. Otherwise, we update `batchEnd` to process the next
    +    // batch. If `batchEnd` reaches partition end, exit the outer loop. Since `batchEnd` is
    +    // initialized with `nextIndex`, the first iteration of outer loop will not enter the inner
    +    // loop but just update the `batchEnd`.
    +    //
    +    // The inner loop iterates from 0 to `localEnd`, which is calculated by
    +    // `(batchEnd - nextIndex) / step`. Since `batchEnd` is increased by `nextBatchTodo * step` in
    +    // the outer loop, and initialized with `nextIndex`, so `batchEnd - nextIndex` is always
    +    // divisible by `step`. The `nextIndex` is increased by `step` during each iteration, and ends
    +    // up being equal to `batchEnd` when the inner loop finishes.
    +    //
    +    // The inner loop can be interrupted, if the query has produced at least one result row, so that
    +    // we don't buffer many result rows and waste memory. It's ok to interrupt the inner loop,
    +    // because `nextIndex` is updated per loop iteration and remembers how far we have processed.
    +
         s"""
           | // initialize Range
           | if (!$initTerm) {
           |   $initTerm = true;
           |   $initRangeFuncName(partitionIndex);
           | }
           |
    -      | while (true) {
    -      |   long $range = $batchEnd - $number;
    +      | while ($loopCondition) {
    +      |   long $range = $batchEnd - $nextIndex;
           |   if ($range != 0L) {
           |     int $localEnd = (int)($range / ${step}L);
    -      |     for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
    -      |       long $value = ((long)$localIdx * ${step}L) + $number;
    -      |       ${consume(ctx, Seq(ev))}
    -      |       $shouldStop
    +      |     $processingLoop
    +      |   } else {
    +      |     long $nextBatchTodo;
    --- End diff --
    
    I see, but in this way we are looping 2 more times in the outer loop, because we either go in the if or in the else while previously we were doing both on the same iteration IIUC. I don't think it is a big issue but it may introduce a (very small probably) overhead compared to the previous case.
    
    Since if IIUC in the first iteration we just go to the else branch now, since `batchEnd` is inited to `nextIndex`, do you think it is feasible to move this block before the inner loop? So we would solve both issues, right?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd shou...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22621#discussion_r222642107
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
    @@ -517,4 +517,57 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
       test("writing data out metrics with dynamic partition: parquet") {
         testMetricsDynamicPartition("parquet", "parquet", "t1")
       }
    +
    +  test("SPARK-25602: SparkPlan.getByteArrayRdd should not consume the input when not necessary") {
    +    def checkFilterAndRangeMetrics(
    +        df: DataFrame,
    +        filterNumOutputs: Int,
    +        rangeNumOutputs: Int): Unit = {
    +      var filter: FilterExec = null
    --- End diff --
    
    In the future if we need to catch more nodes, we should abstract it. But for now it's only range and filter, I think it's ok.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] range metrics can be wrong if the res...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96895/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22621#discussion_r222343539
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -453,45 +453,89 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
         val localIdx = ctx.freshName("localIdx")
         val localEnd = ctx.freshName("localEnd")
         val range = ctx.freshName("range")
    -    val shouldStop = if (parent.needStopCheck) {
    -      s"if (shouldStop()) { $number = $value + ${step}L; return; }"
    +
    +    val processingLoop = if (parent.needStopCheck) {
    +      // TODO (cloud-fan): do we really need to do the stop check within batch?
    --- End diff --
    
    mmmh, but `localIdx` would become `localEnd` then, right? So the UTs you added would fail, or am I missing something?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22621#discussion_r222510006
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
    @@ -517,4 +517,93 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
       test("writing data out metrics with dynamic partition: parquet") {
         testMetricsDynamicPartition("parquet", "parquet", "t1")
       }
    +
    +  test("SPARK-25602: range metrics can be wrong if the result rows are not fully consumed") {
    +    val df = spark.range(0, 30, 1, 2).toDF().filter('id % 3 === 0)
    +
    +    withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
    +      df.collect()
    +      df.queryExecution.executedPlan.foreach {
    +        case w: WholeStageCodegenExec =>
    +          w.child.foreach {
    +            case f: FilterExec => assert(f.metrics("numOutputRows").value == 10L)
    +            case r: RangeExec => assert(r.metrics("numOutputRows").value == 30L)
    +            case _ =>
    +          }
    +
    +        case _ =>
    +      }
    +    }
    +
    +    withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
    +      df.collect()
    +      df.queryExecution.executedPlan.foreach {
    +        case f: FilterExec => assert(f.metrics("numOutputRows").value == 10L)
    +        case r: RangeExec => assert(r.metrics("numOutputRows").value == 30L)
    +        case _ =>
    +      }
    +    }
    +
    +    withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
    +      df.queryExecution.executedPlan.foreach(_.resetMetrics())
    +      // For each partition, we get 2 rows. Then the Filter should produce 2 rows, and Range should
    +      // produce 4 rows(0, 1, 2, 3).
    +      df.queryExecution.toRdd.mapPartitions(_.take(2)).collect()
    +      df.queryExecution.executedPlan.foreach {
    +        case w: WholeStageCodegenExec =>
    +          w.child.foreach {
    +            // Range has 2 partitions, so the expected metrics for filter should be 2 * 2, for range
    +            // should be 4 * 2.
    +            case f: FilterExec => assert(f.metrics("numOutputRows").value == 4L)
    +            case r: RangeExec => assert(r.metrics("numOutputRows").value == 8L)
    +            case _ =>
    +          }
    +
    +        case _ =>
    +      }
    +    }
    +
    +    withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
    +      df.queryExecution.executedPlan.foreach(_.resetMetrics())
    +      // For each partition, we get 2 rows. Then the Filter should produce 2 rows, and Range should
    +      // produce 4 rows(0, 1, 2, 3).
    +      df.queryExecution.toRdd.mapPartitions(_.take(2)).collect()
    +      df.queryExecution.executedPlan.foreach {
    +        // Range has 2 partitions, so the expected metrics for filter should be 2 * 2, for range
    +        // should be 4 * 2.
    +        case f: FilterExec => assert(f.metrics("numOutputRows").value == 4L)
    +        case r: RangeExec => assert(r.metrics("numOutputRows").value == 8L)
    +        case _ =>
    +      }
    +    }
    +
    +    val df2 = df.limit(2)
    +
    +    withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
    +      // Top-most limit will only run the first task, so totally the Filter produces 2 rows, and
    +      // Range produces 4 rows(0, 1, 2, 3).
    +      df2.collect()
    +      df2.queryExecution.executedPlan.foreach {
    +        case w: WholeStageCodegenExec =>
    +          w.child.foreach {
    +            case f: FilterExec => assert(f.metrics("numOutputRows").value == 2L)
    +            case r: RangeExec => assert(r.metrics("numOutputRows").value == 4L)
    +            case _ =>
    +          }
    +
    +        case _ =>
    +      }
    +    }
    +
    +    withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
    +      // Top-most limit will only run the first task, so totally the Filter produces 2 rows, and
    --- End diff --
    
    yes


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] range metrics can be wrong if the res...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd shou...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/22621


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    LGTM


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22621#discussion_r222368401
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
    @@ -517,4 +517,93 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
       test("writing data out metrics with dynamic partition: parquet") {
         testMetricsDynamicPartition("parquet", "parquet", "t1")
       }
    +
    +  test("SPARK-25602: range metrics can be wrong if the result rows are not fully consumed") {
    +    val df = spark.range(0, 30, 1, 2).toDF().filter('id % 3 === 0)
    +
    +    withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
    +      df.collect()
    +      df.queryExecution.executedPlan.foreach {
    +        case w: WholeStageCodegenExec =>
    +          w.child.foreach {
    +            case f: FilterExec => assert(f.metrics("numOutputRows").value == 10L)
    +            case r: RangeExec => assert(r.metrics("numOutputRows").value == 30L)
    +            case _ =>
    +          }
    +
    +        case _ =>
    +      }
    +    }
    +
    +    withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
    +      df.collect()
    +      df.queryExecution.executedPlan.foreach {
    +        case f: FilterExec => assert(f.metrics("numOutputRows").value == 10L)
    +        case r: RangeExec => assert(r.metrics("numOutputRows").value == 30L)
    +        case _ =>
    +      }
    +    }
    +
    +    withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
    +      df.queryExecution.executedPlan.foreach(_.resetMetrics())
    +      // For each partition, we get 2 rows. Then the Filter should produce 2 rows, and Range should
    +      // produce 4 rows(0, 1, 2, 3).
    +      df.queryExecution.toRdd.mapPartitions(_.take(2)).collect()
    +      df.queryExecution.executedPlan.foreach {
    +        case w: WholeStageCodegenExec =>
    +          w.child.foreach {
    +            // Range has 2 partitions, so the expected metrics for filter should be 2 * 2, for range
    +            // should be 4 * 2.
    +            case f: FilterExec => assert(f.metrics("numOutputRows").value == 4L)
    +            case r: RangeExec => assert(r.metrics("numOutputRows").value == 8L)
    +            case _ =>
    +          }
    +
    +        case _ =>
    +      }
    +    }
    +
    +    withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
    +      df.queryExecution.executedPlan.foreach(_.resetMetrics())
    +      // For each partition, we get 2 rows. Then the Filter should produce 2 rows, and Range should
    +      // produce 4 rows(0, 1, 2, 3).
    +      df.queryExecution.toRdd.mapPartitions(_.take(2)).collect()
    +      df.queryExecution.executedPlan.foreach {
    +        // Range has 2 partitions, so the expected metrics for filter should be 2 * 2, for range
    +        // should be 4 * 2.
    +        case f: FilterExec => assert(f.metrics("numOutputRows").value == 4L)
    +        case r: RangeExec => assert(r.metrics("numOutputRows").value == 8L)
    +        case _ =>
    +      }
    +    }
    +
    +    val df2 = df.limit(2)
    +
    +    withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
    +      // Top-most limit will only run the first task, so totally the Filter produces 2 rows, and
    +      // Range produces 4 rows(0, 1, 2, 3).
    +      df2.collect()
    +      df2.queryExecution.executedPlan.foreach {
    +        case w: WholeStageCodegenExec =>
    +          w.child.foreach {
    +            case f: FilterExec => assert(f.metrics("numOutputRows").value == 2L)
    +            case r: RangeExec => assert(r.metrics("numOutputRows").value == 4L)
    +            case _ =>
    +          }
    +
    +        case _ =>
    +      }
    +    }
    +
    +    withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
    +      // Top-most limit will only run the first task, so totally the Filter produces 2 rows, and
    --- End diff --
    
    Does first task mean first partition?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    **[Test build #96924 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96924/testReport)** for PR 22621 at commit [`3b9b41f`](https://github.com/apache/spark/commit/3b9b41f29e4819d63097645ed81d42b6fcde0b5d).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22621#discussion_r222334931
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -453,45 +453,89 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
         val localIdx = ctx.freshName("localIdx")
         val localEnd = ctx.freshName("localEnd")
         val range = ctx.freshName("range")
    -    val shouldStop = if (parent.needStopCheck) {
    -      s"if (shouldStop()) { $number = $value + ${step}L; return; }"
    +
    +    val processingLoop = if (parent.needStopCheck) {
    +      // TODO (cloud-fan): do we really need to do the stop check within batch?
    --- End diff --
    
    if we don't, then we would consume more rows than needed, don't we?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22621#discussion_r222319861
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -453,45 +453,89 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
         val localIdx = ctx.freshName("localIdx")
         val localEnd = ctx.freshName("localEnd")
         val range = ctx.freshName("range")
    -    val shouldStop = if (parent.needStopCheck) {
    -      s"if (shouldStop()) { $number = $value + ${step}L; return; }"
    +
    +    val processingLoop = if (parent.needStopCheck) {
    +      // TODO (cloud-fan): do we really need to do the stop check within batch?
    --- End diff --
    
    This is the motivation of bringing the discussion at https://github.com/apache/spark/pull/10989#discussion_r221961271
    
    If it's OK to not interrupt the loop and buffer result rows for join, I think it's also OK here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22621#discussion_r222354699
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -397,7 +397,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
         // within a batch, while the code in the outer loop is setting batch parameters and updating
         // the metrics.
    --- End diff --
    
    This comment should be updated too.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22621#discussion_r222337687
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
    @@ -517,4 +517,93 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
       test("writing data out metrics with dynamic partition: parquet") {
         testMetricsDynamicPartition("parquet", "parquet", "t1")
       }
    +
    +  test("SPARK-25602: range metrics can be wrong if the result rows are not fully consumed") {
    +    val df = spark.range(0, 30, 1, 2).toDF().filter('id % 3 === 0)
    +
    +    withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
    +      df.collect()
    +      df.queryExecution.executedPlan.foreach {
    +        case w: WholeStageCodegenExec =>
    +          w.child.foreach {
    +            case f: FilterExec => assert(f.metrics("numOutputRows").value == 10L)
    --- End diff --
    
    not a big issue, but if later we change things and these are not anymore here, we would not run the assert here. I would suggest to collect the `FilterExec` and the `RangeExec` and enforce that we collected 1 of both and then assert on them. What do you think?
    
    Moreover, nit: would it be possible to dedup the code here? The tests are very similar with codegen on and off, only collecting the two exec nodes differs...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] range metrics can be wrong if the res...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] range metrics can be wrong if the res...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd shou...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22621#discussion_r222601659
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
    @@ -517,4 +517,57 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
       test("writing data out metrics with dynamic partition: parquet") {
         testMetricsDynamicPartition("parquet", "parquet", "t1")
       }
    +
    +  test("SPARK-25602: SparkPlan.getByteArrayRdd should not consume the input when not necessary") {
    +    def checkFilterAndRangeMetrics(
    +        df: DataFrame,
    +        filterNumOutputs: Int,
    +        rangeNumOutputs: Int): Unit = {
    +      var filter: FilterExec = null
    --- End diff --
    
    what about something like this:
    ```
    def collectExecNode[T](pf: PartialFunction[SparkPlan, T]): PartialFunction[SparkPlan, T] = {
            pf.orElse {
              case w: WholeStageCodegenExec =>
                w.child.collect(pf).head
            }
          }
          val range = df.queryExecution.executedPlan.collectFirst(
            collectExecNode { case r: RangeExec => r })
          val filter = df.queryExecution.executedPlan.collectFirst(
            collectExecNode { case f: FilterExec => f })
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] range metrics can be wrong if the res...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    ok to test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22621#discussion_r222520483
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
    @@ -453,45 +453,89 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
         val localIdx = ctx.freshName("localIdx")
         val localEnd = ctx.freshName("localEnd")
         val range = ctx.freshName("range")
    -    val shouldStop = if (parent.needStopCheck) {
    -      s"if (shouldStop()) { $number = $value + ${step}L; return; }"
    +
    +    val processingLoop = if (parent.needStopCheck) {
    +      // TODO (cloud-fan): do we really need to do the stop check within batch?
    +      s"""
    +         |int $localIdx = 0;
    +         |for (; $localIdx < $localEnd && !shouldStop(); $localIdx++) {
    +         |  long $value = $nextIndex;
    +         |  ${consume(ctx, Seq(ev))}
    +         |  $nextIndex += ${step}L;
    +         |}
    +         |$numOutput.add($localIdx);
    +         |$inputMetrics.incRecordsRead($localIdx);
    +       """.stripMargin
    +    } else {
    +      s"""
    +         |for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
    +         |  long $value = ((long)$localIdx * ${step}L) + $nextIndex;
    +         |  ${consume(ctx, Seq(ev))}
    +         |}
    +         |$nextIndex = $batchEnd;
    +         |$numOutput.add($localEnd);
    +         |$inputMetrics.incRecordsRead($localEnd);
    +       """.stripMargin
    +    }
    +
    +    val loopCondition = if (parent.needStopCheck) {
    +      "!shouldStop()"
         } else {
    -      "// shouldStop check is eliminated"
    +      "true"
         }
    +
    +    // An overview of the Range processing.
    +    //
    +    // For each partition, the Range task needs to produce records from partition start(inclusive)
    +    // to end(exclusive). For better performance, we separate the partition range into batches, and
    +    // use 2 loops to produce data. The outer while loop is used to iterate batches, and the inner
    +    // for loop is used to iterate records inside a batch.
    +    //
    +    // `nextIndex` tracks the index of the next record that is going to be consumed, initialized
    +    // with partition start. `batchEnd` tracks the end index of the current batch, initialized
    +    // with `nextIndex`. In the outer loop, we first check if `batchEnd - nextIndex` is non-zero.
    +    // Note that it can be negative, because range step can be negative. If `batchEnd - nextIndex`
    +    // is non-zero, we enter the inner loop. Otherwise, we update `batchEnd` to process the next
    +    // batch. If `batchEnd` reaches partition end, exit the outer loop. Since `batchEnd` is
    +    // initialized with `nextIndex`, the first iteration of outer loop will not enter the inner
    +    // loop but just update the `batchEnd`.
    +    //
    +    // The inner loop iterates from 0 to `localEnd`, which is calculated by
    +    // `(batchEnd - nextIndex) / step`. Since `batchEnd` is increased by `nextBatchTodo * step` in
    +    // the outer loop, and initialized with `nextIndex`, so `batchEnd - nextIndex` is always
    +    // divisible by `step`. The `nextIndex` is increased by `step` during each iteration, and ends
    +    // up being equal to `batchEnd` when the inner loop finishes.
    +    //
    +    // The inner loop can be interrupted, if the query has produced at least one result row, so that
    +    // we don't buffer many result rows and waste memory. It's ok to interrupt the inner loop,
    +    // because `nextIndex` is updated per loop iteration and remembers how far we have processed.
    +
         s"""
           | // initialize Range
           | if (!$initTerm) {
           |   $initTerm = true;
           |   $initRangeFuncName(partitionIndex);
           | }
           |
    -      | while (true) {
    -      |   long $range = $batchEnd - $number;
    +      | while ($loopCondition) {
    +      |   long $range = $batchEnd - $nextIndex;
           |   if ($range != 0L) {
           |     int $localEnd = (int)($range / ${step}L);
    -      |     for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
    -      |       long $value = ((long)$localIdx * ${step}L) + $number;
    -      |       ${consume(ctx, Seq(ev))}
    -      |       $shouldStop
    +      |     $processingLoop
    +      |   } else {
    +      |     long $nextBatchTodo;
    --- End diff --
    
    good idea!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22621
  
    **[Test build #96920 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96920/testReport)** for PR 22621 at commit [`1c94d13`](https://github.com/apache/spark/commit/1c94d13895f4537111518e5b26075395bcc250b0).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org