You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by cloud-fan <gi...@git.apache.org> on 2018/10/03 13:52:33 UTC
[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...
GitHub user cloud-fan opened a pull request:
https://github.com/apache/spark/pull/22621
[SPARK-25602][SQL] range metrics can be wrong if the result rows are not fully consumed
## What changes were proposed in this pull request?
This is a long-standing bug. When `Range` is whole stage codegened, it updates metrics before producing records of each batch. However, when producing records of a batch, the loop can be interrupted and then the metrics can be wrong.
To fix this bug, this PR proposes to update `Range` metrics after a batch(or part of it if the loop is interrupted) is consumed.
Since the bug is only about metrics, and the fix is non-trivial, and it's not a regression for 2.4, I'm targeting this PR to master only.
## How was this patch tested?
new tests
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/cloud-fan/spark range
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/22621.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #22621
----
commit 01c1738b934ea79f2ee54fde884501140b9854e4
Author: Wenchen Fan <we...@...>
Date: 2018-10-03T05:22:07Z
range metrics can be wrong if the result rows are not fully consumed
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] range metrics can be wrong if the res...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22621
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] range metrics can be wrong if the res...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22621
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3669/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22621
Merged build finished. Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] range metrics can be wrong if the res...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22621
**[Test build #96895 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96895/testReport)** for PR 22621 at commit [`454efab`](https://github.com/apache/spark/commit/454efab90098b7ea62803e9237290a7bb2da87a0).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...
Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/22621#discussion_r222341169
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
@@ -453,45 +453,89 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
val localIdx = ctx.freshName("localIdx")
val localEnd = ctx.freshName("localEnd")
val range = ctx.freshName("range")
- val shouldStop = if (parent.needStopCheck) {
- s"if (shouldStop()) { $number = $value + ${step}L; return; }"
+
+ val processingLoop = if (parent.needStopCheck) {
+ // TODO (cloud-fan): do we really need to do the stop check within batch?
--- End diff --
we just buffer more rows in `BufferRowIterator.currentRows`, it's only about performance IIUC.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...
Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:
https://github.com/apache/spark/pull/22621
Let's say, this can be behaivour changes too since metrics are now changed. Should we update migration guide for safety?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/22621#discussion_r222351322
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
@@ -453,45 +453,89 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
val localIdx = ctx.freshName("localIdx")
val localEnd = ctx.freshName("localEnd")
val range = ctx.freshName("range")
- val shouldStop = if (parent.needStopCheck) {
- s"if (shouldStop()) { $number = $value + ${step}L; return; }"
+
+ val processingLoop = if (parent.needStopCheck) {
+ // TODO (cloud-fan): do we really need to do the stop check within batch?
--- End diff --
I think that there is BroadcastHashJoin case doesn't mean it is generally ok to buffer more rows. If it is possible, we still should avoid it.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22621
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96924/
Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...
Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:
https://github.com/apache/spark/pull/22621
thanks, merging to master/2.4!
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22621
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3672/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...
Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/22621#discussion_r222342196
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
@@ -453,45 +453,89 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
val localIdx = ctx.freshName("localIdx")
val localEnd = ctx.freshName("localEnd")
val range = ctx.freshName("range")
- val shouldStop = if (parent.needStopCheck) {
- s"if (shouldStop()) { $number = $value + ${step}L; return; }"
+
+ val processingLoop = if (parent.needStopCheck) {
+ // TODO (cloud-fan): do we really need to do the stop check within batch?
+ s"""
+ |int $localIdx = 0;
+ |for (; $localIdx < $localEnd && !shouldStop(); $localIdx++) {
+ | long $value = $nextIndex;
+ | ${consume(ctx, Seq(ev))}
+ | $nextIndex += ${step}L;
+ |}
+ |$numOutput.add($localIdx);
+ |$inputMetrics.incRecordsRead($localIdx);
+ """.stripMargin
+ } else {
+ s"""
+ |for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
+ | long $value = ((long)$localIdx * ${step}L) + $nextIndex;
+ | ${consume(ctx, Seq(ev))}
+ |}
+ |$nextIndex = $batchEnd;
+ |$numOutput.add($localEnd);
+ |$inputMetrics.incRecordsRead($localEnd);
+ """.stripMargin
+ }
+
+ val loopCondition = if (parent.needStopCheck) {
+ "!shouldStop()"
} else {
- "// shouldStop check is eliminated"
+ "true"
}
+
+ // An overview of the Range processing.
+ //
+ // For each partition, the Range task needs to produce records from partition start(inclusive)
+ // to end(exclusive). For better performance, we separate the partition range into batches, and
+ // use 2 loops to produce data. The outer while loop is used to iterate batches, and the inner
+ // for loop is used to iterate records inside a batch.
+ //
+ // `nextIndex` tracks the index of the next record that is going to be consumed, initialized
+ // with partition start. `batchEnd` tracks the end index of the current batch, initialized
+ // with `nextIndex`. In the outer loop, we first check if `batchEnd - nextIndex` is non-zero.
+ // Note that it can be negative, because range step can be negative. If `batchEnd - nextIndex`
+ // is non-zero, we enter the inner loop. Otherwise, we update `batchEnd` to process the next
+ // batch. If `batchEnd` reaches partition end, exit the outer loop. Since `batchEnd` is
+ // initialized with `nextIndex`, the first iteration of outer loop will not enter the inner
+ // loop but just update the `batchEnd`.
+ //
+ // The inner loop iterates from 0 to `localEnd`, which is calculated by
+ // `(batchEnd - nextIndex) / step`. Since `batchEnd` is increased by `nextBatchTodo * step` in
+ // the outer loop, and initialized with `nextIndex`, so `batchEnd - nextIndex` is always
+ // divisible by `step`. The `nextIndex` is increased by `step` during each iteration, and ends
+ // up being equal to `batchEnd` when the inner loop finishes.
+ //
+ // The inner loop can be interrupted, if the query has produced at least one result row, so that
+ // we don't buffer many result rows and waste memory. It's ok to interrupt the inner loop,
+ // because `nextIndex` is updated per loop iteration and remembers how far we have processed.
+
s"""
| // initialize Range
| if (!$initTerm) {
| $initTerm = true;
| $initRangeFuncName(partitionIndex);
| }
|
- | while (true) {
- | long $range = $batchEnd - $number;
+ | while ($loopCondition) {
+ | long $range = $batchEnd - $nextIndex;
| if ($range != 0L) {
| int $localEnd = (int)($range / ${step}L);
- | for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
- | long $value = ((long)$localIdx * ${step}L) + $number;
- | ${consume(ctx, Seq(ev))}
- | $shouldStop
+ | $processingLoop
+ | } else {
+ | long $nextBatchTodo;
--- End diff --
Now we don't do `return` when we need to stop the loop. Move these lines to else, so that we won't hit this code path when loop is interrupted.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22621
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22621
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3673/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22621
**[Test build #96927 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96927/testReport)** for PR 22621 at commit [`3b9b41f`](https://github.com/apache/spark/commit/3b9b41f29e4819d63097645ed81d42b6fcde0b5d).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] range metrics can be wrong if the res...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22621
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3657/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22621
**[Test build #96924 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96924/testReport)** for PR 22621 at commit [`3b9b41f`](https://github.com/apache/spark/commit/3b9b41f29e4819d63097645ed81d42b6fcde0b5d).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22621
**[Test build #96927 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96927/testReport)** for PR 22621 at commit [`3b9b41f`](https://github.com/apache/spark/commit/3b9b41f29e4819d63097645ed81d42b6fcde0b5d).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] range metrics can be wrong if the res...
Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:
https://github.com/apache/spark/pull/22621
cc @kiszk @viirya @mgaido91
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...
Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:
https://github.com/apache/spark/pull/22621#discussion_r222320450
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -250,7 +250,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
val bos = new ByteArrayOutputStream()
val out = new DataOutputStream(codec.compressedOutputStream(bos))
- while (iter.hasNext && (n < 0 || count < n)) {
+ // `iter.hasNext` may produce one row and buffer it, we should only call it when the limit is
+ // not hit.
+ while ((n < 0 || count < n) && iter.hasNext) {
--- End diff --
nice catch this one!
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22621
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96920/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...
Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/22621#discussion_r222510592
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
@@ -453,45 +453,89 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
val localIdx = ctx.freshName("localIdx")
val localEnd = ctx.freshName("localEnd")
val range = ctx.freshName("range")
- val shouldStop = if (parent.needStopCheck) {
- s"if (shouldStop()) { $number = $value + ${step}L; return; }"
+
+ val processingLoop = if (parent.needStopCheck) {
+ // TODO (cloud-fan): do we really need to do the stop check within batch?
--- End diff --
> mmmh, but localIdx would become localEnd then, right? So the UTs you added would fail, or am I missing something?
You can pull my PR and try, the whole stage codegen is a little convoluted. `localIndex` is local and always starts with 0, `localEnd` is decided by the global `nextIndex`.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] range metrics can be wrong if the res...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22621
**[Test build #96920 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96920/testReport)** for PR 22621 at commit [`1c94d13`](https://github.com/apache/spark/commit/1c94d13895f4537111518e5b26075395bcc250b0).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...
Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:
https://github.com/apache/spark/pull/22621#discussion_r222330998
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
@@ -453,45 +453,89 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
val localIdx = ctx.freshName("localIdx")
val localEnd = ctx.freshName("localEnd")
val range = ctx.freshName("range")
- val shouldStop = if (parent.needStopCheck) {
- s"if (shouldStop()) { $number = $value + ${step}L; return; }"
+
+ val processingLoop = if (parent.needStopCheck) {
+ // TODO (cloud-fan): do we really need to do the stop check within batch?
+ s"""
+ |int $localIdx = 0;
+ |for (; $localIdx < $localEnd && !shouldStop(); $localIdx++) {
+ | long $value = $nextIndex;
+ | ${consume(ctx, Seq(ev))}
+ | $nextIndex += ${step}L;
+ |}
+ |$numOutput.add($localIdx);
+ |$inputMetrics.incRecordsRead($localIdx);
+ """.stripMargin
+ } else {
+ s"""
+ |for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
+ | long $value = ((long)$localIdx * ${step}L) + $nextIndex;
+ | ${consume(ctx, Seq(ev))}
+ |}
+ |$nextIndex = $batchEnd;
+ |$numOutput.add($localEnd);
+ |$inputMetrics.incRecordsRead($localEnd);
+ """.stripMargin
+ }
+
+ val loopCondition = if (parent.needStopCheck) {
+ "!shouldStop()"
} else {
- "// shouldStop check is eliminated"
+ "true"
}
+
+ // An overview of the Range processing.
+ //
+ // For each partition, the Range task needs to produce records from partition start(inclusive)
+ // to end(exclusive). For better performance, we separate the partition range into batches, and
+ // use 2 loops to produce data. The outer while loop is used to iterate batches, and the inner
+ // for loop is used to iterate records inside a batch.
+ //
+ // `nextIndex` tracks the index of the next record that is going to be consumed, initialized
+ // with partition start. `batchEnd` tracks the end index of the current batch, initialized
+ // with `nextIndex`. In the outer loop, we first check if `batchEnd - nextIndex` is non-zero.
+ // Note that it can be negative, because range step can be negative. If `batchEnd - nextIndex`
+ // is non-zero, we enter the inner loop. Otherwise, we update `batchEnd` to process the next
+ // batch. If `batchEnd` reaches partition end, exit the outer loop. Since `batchEnd` is
+ // initialized with `nextIndex`, the first iteration of outer loop will not enter the inner
+ // loop but just update the `batchEnd`.
+ //
+ // The inner loop iterates from 0 to `localEnd`, which is calculated by
+ // `(batchEnd - nextIndex) / step`. Since `batchEnd` is increased by `nextBatchTodo * step` in
+ // the outer loop, and initialized with `nextIndex`, so `batchEnd - nextIndex` is always
+ // divisible by `step`. The `nextIndex` is increased by `step` during each iteration, and ends
+ // up being equal to `batchEnd` when the inner loop finishes.
+ //
+ // The inner loop can be interrupted, if the query has produced at least one result row, so that
+ // we don't buffer many result rows and waste memory. It's ok to interrupt the inner loop,
+ // because `nextIndex` is updated per loop iteration and remembers how far we have processed.
+
s"""
| // initialize Range
| if (!$initTerm) {
| $initTerm = true;
| $initRangeFuncName(partitionIndex);
| }
|
- | while (true) {
- | long $range = $batchEnd - $number;
+ | while ($loopCondition) {
+ | long $range = $batchEnd - $nextIndex;
| if ($range != 0L) {
| int $localEnd = (int)($range / ${step}L);
- | for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
- | long $value = ((long)$localIdx * ${step}L) + $number;
- | ${consume(ctx, Seq(ev))}
- | $shouldStop
+ | $processingLoop
+ | } else {
+ | long $nextBatchTodo;
--- End diff --
why did you move these lines in the else?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...
Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:
https://github.com/apache/spark/pull/22621
I simplified this PR to focus on `SparkPlan.getByteArrayRdd` only. Will submit PR to fix range later.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] range metrics can be wrong if the res...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22621
**[Test build #96895 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96895/testReport)** for PR 22621 at commit [`454efab`](https://github.com/apache/spark/commit/454efab90098b7ea62803e9237290a7bb2da87a0).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...
Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:
https://github.com/apache/spark/pull/22621
why do we need migration guide for bug fix?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22621
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96927/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...
Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:
https://github.com/apache/spark/pull/22621
That's my point. Why do we have to document for fixing unexpected results fixed
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...
Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:
https://github.com/apache/spark/pull/22621#discussion_r222345983
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
@@ -453,45 +453,89 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
val localIdx = ctx.freshName("localIdx")
val localEnd = ctx.freshName("localEnd")
val range = ctx.freshName("range")
- val shouldStop = if (parent.needStopCheck) {
- s"if (shouldStop()) { $number = $value + ${step}L; return; }"
+
+ val processingLoop = if (parent.needStopCheck) {
+ // TODO (cloud-fan): do we really need to do the stop check within batch?
+ s"""
+ |int $localIdx = 0;
+ |for (; $localIdx < $localEnd && !shouldStop(); $localIdx++) {
+ | long $value = $nextIndex;
+ | ${consume(ctx, Seq(ev))}
+ | $nextIndex += ${step}L;
+ |}
+ |$numOutput.add($localIdx);
+ |$inputMetrics.incRecordsRead($localIdx);
+ """.stripMargin
+ } else {
+ s"""
+ |for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
+ | long $value = ((long)$localIdx * ${step}L) + $nextIndex;
+ | ${consume(ctx, Seq(ev))}
+ |}
+ |$nextIndex = $batchEnd;
+ |$numOutput.add($localEnd);
+ |$inputMetrics.incRecordsRead($localEnd);
+ """.stripMargin
+ }
+
+ val loopCondition = if (parent.needStopCheck) {
+ "!shouldStop()"
} else {
- "// shouldStop check is eliminated"
+ "true"
}
+
+ // An overview of the Range processing.
+ //
+ // For each partition, the Range task needs to produce records from partition start(inclusive)
+ // to end(exclusive). For better performance, we separate the partition range into batches, and
+ // use 2 loops to produce data. The outer while loop is used to iterate batches, and the inner
+ // for loop is used to iterate records inside a batch.
+ //
+ // `nextIndex` tracks the index of the next record that is going to be consumed, initialized
+ // with partition start. `batchEnd` tracks the end index of the current batch, initialized
+ // with `nextIndex`. In the outer loop, we first check if `batchEnd - nextIndex` is non-zero.
+ // Note that it can be negative, because range step can be negative. If `batchEnd - nextIndex`
+ // is non-zero, we enter the inner loop. Otherwise, we update `batchEnd` to process the next
+ // batch. If `batchEnd` reaches partition end, exit the outer loop. Since `batchEnd` is
+ // initialized with `nextIndex`, the first iteration of outer loop will not enter the inner
+ // loop but just update the `batchEnd`.
+ //
+ // The inner loop iterates from 0 to `localEnd`, which is calculated by
+ // `(batchEnd - nextIndex) / step`. Since `batchEnd` is increased by `nextBatchTodo * step` in
+ // the outer loop, and initialized with `nextIndex`, so `batchEnd - nextIndex` is always
+ // divisible by `step`. The `nextIndex` is increased by `step` during each iteration, and ends
+ // up being equal to `batchEnd` when the inner loop finishes.
+ //
+ // The inner loop can be interrupted, if the query has produced at least one result row, so that
+ // we don't buffer many result rows and waste memory. It's ok to interrupt the inner loop,
+ // because `nextIndex` is updated per loop iteration and remembers how far we have processed.
+
s"""
| // initialize Range
| if (!$initTerm) {
| $initTerm = true;
| $initRangeFuncName(partitionIndex);
| }
|
- | while (true) {
- | long $range = $batchEnd - $number;
+ | while ($loopCondition) {
+ | long $range = $batchEnd - $nextIndex;
| if ($range != 0L) {
| int $localEnd = (int)($range / ${step}L);
- | for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
- | long $value = ((long)$localIdx * ${step}L) + $number;
- | ${consume(ctx, Seq(ev))}
- | $shouldStop
+ | $processingLoop
+ | } else {
+ | long $nextBatchTodo;
--- End diff --
I see, but in this way we are looping 2 more times in the outer loop, because we either go in the if or in the else while previously we were doing both on the same iteration IIUC. I don't think it is a big issue but it may introduce a (very small probably) overhead compared to the previous case.
Since if IIUC in the first iteration we just go to the else branch now, since `batchEnd` is inited to `nextIndex`, do you think it is feasible to move this block before the inner loop? So we would solve both issues, right?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd shou...
Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/22621#discussion_r222642107
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
@@ -517,4 +517,57 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
test("writing data out metrics with dynamic partition: parquet") {
testMetricsDynamicPartition("parquet", "parquet", "t1")
}
+
+ test("SPARK-25602: SparkPlan.getByteArrayRdd should not consume the input when not necessary") {
+ def checkFilterAndRangeMetrics(
+ df: DataFrame,
+ filterNumOutputs: Int,
+ rangeNumOutputs: Int): Unit = {
+ var filter: FilterExec = null
--- End diff --
In the future if we need to catch more nodes, we should abstract it. But for now it's only range and filter, I think it's ok.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] range metrics can be wrong if the res...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22621
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96895/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...
Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:
https://github.com/apache/spark/pull/22621#discussion_r222343539
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
@@ -453,45 +453,89 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
val localIdx = ctx.freshName("localIdx")
val localEnd = ctx.freshName("localEnd")
val range = ctx.freshName("range")
- val shouldStop = if (parent.needStopCheck) {
- s"if (shouldStop()) { $number = $value + ${step}L; return; }"
+
+ val processingLoop = if (parent.needStopCheck) {
+ // TODO (cloud-fan): do we really need to do the stop check within batch?
--- End diff --
mmmh, but `localIdx` would become `localEnd` then, right? So the UTs you added would fail, or am I missing something?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...
Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/22621#discussion_r222510006
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
@@ -517,4 +517,93 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
test("writing data out metrics with dynamic partition: parquet") {
testMetricsDynamicPartition("parquet", "parquet", "t1")
}
+
+ test("SPARK-25602: range metrics can be wrong if the result rows are not fully consumed") {
+ val df = spark.range(0, 30, 1, 2).toDF().filter('id % 3 === 0)
+
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
+ df.collect()
+ df.queryExecution.executedPlan.foreach {
+ case w: WholeStageCodegenExec =>
+ w.child.foreach {
+ case f: FilterExec => assert(f.metrics("numOutputRows").value == 10L)
+ case r: RangeExec => assert(r.metrics("numOutputRows").value == 30L)
+ case _ =>
+ }
+
+ case _ =>
+ }
+ }
+
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+ df.collect()
+ df.queryExecution.executedPlan.foreach {
+ case f: FilterExec => assert(f.metrics("numOutputRows").value == 10L)
+ case r: RangeExec => assert(r.metrics("numOutputRows").value == 30L)
+ case _ =>
+ }
+ }
+
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
+ df.queryExecution.executedPlan.foreach(_.resetMetrics())
+ // For each partition, we get 2 rows. Then the Filter should produce 2 rows, and Range should
+ // produce 4 rows(0, 1, 2, 3).
+ df.queryExecution.toRdd.mapPartitions(_.take(2)).collect()
+ df.queryExecution.executedPlan.foreach {
+ case w: WholeStageCodegenExec =>
+ w.child.foreach {
+ // Range has 2 partitions, so the expected metrics for filter should be 2 * 2, for range
+ // should be 4 * 2.
+ case f: FilterExec => assert(f.metrics("numOutputRows").value == 4L)
+ case r: RangeExec => assert(r.metrics("numOutputRows").value == 8L)
+ case _ =>
+ }
+
+ case _ =>
+ }
+ }
+
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+ df.queryExecution.executedPlan.foreach(_.resetMetrics())
+ // For each partition, we get 2 rows. Then the Filter should produce 2 rows, and Range should
+ // produce 4 rows(0, 1, 2, 3).
+ df.queryExecution.toRdd.mapPartitions(_.take(2)).collect()
+ df.queryExecution.executedPlan.foreach {
+ // Range has 2 partitions, so the expected metrics for filter should be 2 * 2, for range
+ // should be 4 * 2.
+ case f: FilterExec => assert(f.metrics("numOutputRows").value == 4L)
+ case r: RangeExec => assert(r.metrics("numOutputRows").value == 8L)
+ case _ =>
+ }
+ }
+
+ val df2 = df.limit(2)
+
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
+ // Top-most limit will only run the first task, so totally the Filter produces 2 rows, and
+ // Range produces 4 rows(0, 1, 2, 3).
+ df2.collect()
+ df2.queryExecution.executedPlan.foreach {
+ case w: WholeStageCodegenExec =>
+ w.child.foreach {
+ case f: FilterExec => assert(f.metrics("numOutputRows").value == 2L)
+ case r: RangeExec => assert(r.metrics("numOutputRows").value == 4L)
+ case _ =>
+ }
+
+ case _ =>
+ }
+ }
+
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+ // Top-most limit will only run the first task, so totally the Filter produces 2 rows, and
--- End diff --
yes
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] range metrics can be wrong if the res...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22621
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd shou...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/spark/pull/22621
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:
https://github.com/apache/spark/pull/22621
LGTM
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/22621#discussion_r222368401
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
@@ -517,4 +517,93 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
test("writing data out metrics with dynamic partition: parquet") {
testMetricsDynamicPartition("parquet", "parquet", "t1")
}
+
+ test("SPARK-25602: range metrics can be wrong if the result rows are not fully consumed") {
+ val df = spark.range(0, 30, 1, 2).toDF().filter('id % 3 === 0)
+
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
+ df.collect()
+ df.queryExecution.executedPlan.foreach {
+ case w: WholeStageCodegenExec =>
+ w.child.foreach {
+ case f: FilterExec => assert(f.metrics("numOutputRows").value == 10L)
+ case r: RangeExec => assert(r.metrics("numOutputRows").value == 30L)
+ case _ =>
+ }
+
+ case _ =>
+ }
+ }
+
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+ df.collect()
+ df.queryExecution.executedPlan.foreach {
+ case f: FilterExec => assert(f.metrics("numOutputRows").value == 10L)
+ case r: RangeExec => assert(r.metrics("numOutputRows").value == 30L)
+ case _ =>
+ }
+ }
+
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
+ df.queryExecution.executedPlan.foreach(_.resetMetrics())
+ // For each partition, we get 2 rows. Then the Filter should produce 2 rows, and Range should
+ // produce 4 rows(0, 1, 2, 3).
+ df.queryExecution.toRdd.mapPartitions(_.take(2)).collect()
+ df.queryExecution.executedPlan.foreach {
+ case w: WholeStageCodegenExec =>
+ w.child.foreach {
+ // Range has 2 partitions, so the expected metrics for filter should be 2 * 2, for range
+ // should be 4 * 2.
+ case f: FilterExec => assert(f.metrics("numOutputRows").value == 4L)
+ case r: RangeExec => assert(r.metrics("numOutputRows").value == 8L)
+ case _ =>
+ }
+
+ case _ =>
+ }
+ }
+
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+ df.queryExecution.executedPlan.foreach(_.resetMetrics())
+ // For each partition, we get 2 rows. Then the Filter should produce 2 rows, and Range should
+ // produce 4 rows(0, 1, 2, 3).
+ df.queryExecution.toRdd.mapPartitions(_.take(2)).collect()
+ df.queryExecution.executedPlan.foreach {
+ // Range has 2 partitions, so the expected metrics for filter should be 2 * 2, for range
+ // should be 4 * 2.
+ case f: FilterExec => assert(f.metrics("numOutputRows").value == 4L)
+ case r: RangeExec => assert(r.metrics("numOutputRows").value == 8L)
+ case _ =>
+ }
+ }
+
+ val df2 = df.limit(2)
+
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
+ // Top-most limit will only run the first task, so totally the Filter produces 2 rows, and
+ // Range produces 4 rows(0, 1, 2, 3).
+ df2.collect()
+ df2.queryExecution.executedPlan.foreach {
+ case w: WholeStageCodegenExec =>
+ w.child.foreach {
+ case f: FilterExec => assert(f.metrics("numOutputRows").value == 2L)
+ case r: RangeExec => assert(r.metrics("numOutputRows").value == 4L)
+ case _ =>
+ }
+
+ case _ =>
+ }
+ }
+
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+ // Top-most limit will only run the first task, so totally the Filter produces 2 rows, and
--- End diff --
Does first task mean first partition?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22621
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22621
**[Test build #96924 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96924/testReport)** for PR 22621 at commit [`3b9b41f`](https://github.com/apache/spark/commit/3b9b41f29e4819d63097645ed81d42b6fcde0b5d).
* This patch **fails due to an unknown error code, -9**.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22621
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...
Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:
https://github.com/apache/spark/pull/22621#discussion_r222334931
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
@@ -453,45 +453,89 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
val localIdx = ctx.freshName("localIdx")
val localEnd = ctx.freshName("localEnd")
val range = ctx.freshName("range")
- val shouldStop = if (parent.needStopCheck) {
- s"if (shouldStop()) { $number = $value + ${step}L; return; }"
+
+ val processingLoop = if (parent.needStopCheck) {
+ // TODO (cloud-fan): do we really need to do the stop check within batch?
--- End diff --
if we don't, then we would consume more rows than needed, don't we?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...
Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/22621#discussion_r222319861
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
@@ -453,45 +453,89 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
val localIdx = ctx.freshName("localIdx")
val localEnd = ctx.freshName("localEnd")
val range = ctx.freshName("range")
- val shouldStop = if (parent.needStopCheck) {
- s"if (shouldStop()) { $number = $value + ${step}L; return; }"
+
+ val processingLoop = if (parent.needStopCheck) {
+ // TODO (cloud-fan): do we really need to do the stop check within batch?
--- End diff --
This is the motivation of bringing the discussion at https://github.com/apache/spark/pull/10989#discussion_r221961271
If it's OK to not interrupt the loop and buffer result rows for join, I think it's also OK here.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/22621#discussion_r222354699
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
@@ -397,7 +397,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
// within a batch, while the code in the outer loop is setting batch parameters and updating
// the metrics.
--- End diff --
This comment should be updated too.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...
Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:
https://github.com/apache/spark/pull/22621#discussion_r222337687
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
@@ -517,4 +517,93 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
test("writing data out metrics with dynamic partition: parquet") {
testMetricsDynamicPartition("parquet", "parquet", "t1")
}
+
+ test("SPARK-25602: range metrics can be wrong if the result rows are not fully consumed") {
+ val df = spark.range(0, 30, 1, 2).toDF().filter('id % 3 === 0)
+
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
+ df.collect()
+ df.queryExecution.executedPlan.foreach {
+ case w: WholeStageCodegenExec =>
+ w.child.foreach {
+ case f: FilterExec => assert(f.metrics("numOutputRows").value == 10L)
--- End diff --
not a big issue, but if later we change things and these are not anymore here, we would not run the assert here. I would suggest to collect the `FilterExec` and the `RangeExec` and enforce that we collected 1 of both and then assert on them. What do you think?
Moreover, nit: would it be possible to dedup the code here? The tests are very similar with codegen on and off, only collecting the two exec nodes differs...
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:
https://github.com/apache/spark/pull/22621
retest this please
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] range metrics can be wrong if the res...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22621
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] range metrics can be wrong if the res...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:
https://github.com/apache/spark/pull/22621
retest this please.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd shou...
Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on a diff in the pull request:
https://github.com/apache/spark/pull/22621#discussion_r222601659
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
@@ -517,4 +517,57 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
test("writing data out metrics with dynamic partition: parquet") {
testMetricsDynamicPartition("parquet", "parquet", "t1")
}
+
+ test("SPARK-25602: SparkPlan.getByteArrayRdd should not consume the input when not necessary") {
+ def checkFilterAndRangeMetrics(
+ df: DataFrame,
+ filterNumOutputs: Int,
+ rangeNumOutputs: Int): Unit = {
+ var filter: FilterExec = null
--- End diff --
what about something like this:
```
def collectExecNode[T](pf: PartialFunction[SparkPlan, T]): PartialFunction[SparkPlan, T] = {
pf.orElse {
case w: WholeStageCodegenExec =>
w.child.collect(pf).head
}
}
val range = df.queryExecution.executedPlan.collectFirst(
collectExecNode { case r: RangeExec => r })
val filter = df.queryExecution.executedPlan.collectFirst(
collectExecNode { case f: FilterExec => f })
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22621
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] range metrics can be wrong if the res...
Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:
https://github.com/apache/spark/pull/22621
ok to test
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22621: [SPARK-25602][SQL] range metrics can be wrong if ...
Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/22621#discussion_r222520483
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---
@@ -453,45 +453,89 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
val localIdx = ctx.freshName("localIdx")
val localEnd = ctx.freshName("localEnd")
val range = ctx.freshName("range")
- val shouldStop = if (parent.needStopCheck) {
- s"if (shouldStop()) { $number = $value + ${step}L; return; }"
+
+ val processingLoop = if (parent.needStopCheck) {
+ // TODO (cloud-fan): do we really need to do the stop check within batch?
+ s"""
+ |int $localIdx = 0;
+ |for (; $localIdx < $localEnd && !shouldStop(); $localIdx++) {
+ | long $value = $nextIndex;
+ | ${consume(ctx, Seq(ev))}
+ | $nextIndex += ${step}L;
+ |}
+ |$numOutput.add($localIdx);
+ |$inputMetrics.incRecordsRead($localIdx);
+ """.stripMargin
+ } else {
+ s"""
+ |for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
+ | long $value = ((long)$localIdx * ${step}L) + $nextIndex;
+ | ${consume(ctx, Seq(ev))}
+ |}
+ |$nextIndex = $batchEnd;
+ |$numOutput.add($localEnd);
+ |$inputMetrics.incRecordsRead($localEnd);
+ """.stripMargin
+ }
+
+ val loopCondition = if (parent.needStopCheck) {
+ "!shouldStop()"
} else {
- "// shouldStop check is eliminated"
+ "true"
}
+
+ // An overview of the Range processing.
+ //
+ // For each partition, the Range task needs to produce records from partition start(inclusive)
+ // to end(exclusive). For better performance, we separate the partition range into batches, and
+ // use 2 loops to produce data. The outer while loop is used to iterate batches, and the inner
+ // for loop is used to iterate records inside a batch.
+ //
+ // `nextIndex` tracks the index of the next record that is going to be consumed, initialized
+ // with partition start. `batchEnd` tracks the end index of the current batch, initialized
+ // with `nextIndex`. In the outer loop, we first check if `batchEnd - nextIndex` is non-zero.
+ // Note that it can be negative, because range step can be negative. If `batchEnd - nextIndex`
+ // is non-zero, we enter the inner loop. Otherwise, we update `batchEnd` to process the next
+ // batch. If `batchEnd` reaches partition end, exit the outer loop. Since `batchEnd` is
+ // initialized with `nextIndex`, the first iteration of outer loop will not enter the inner
+ // loop but just update the `batchEnd`.
+ //
+ // The inner loop iterates from 0 to `localEnd`, which is calculated by
+ // `(batchEnd - nextIndex) / step`. Since `batchEnd` is increased by `nextBatchTodo * step` in
+ // the outer loop, and initialized with `nextIndex`, so `batchEnd - nextIndex` is always
+ // divisible by `step`. The `nextIndex` is increased by `step` during each iteration, and ends
+ // up being equal to `batchEnd` when the inner loop finishes.
+ //
+ // The inner loop can be interrupted, if the query has produced at least one result row, so that
+ // we don't buffer many result rows and waste memory. It's ok to interrupt the inner loop,
+ // because `nextIndex` is updated per loop iteration and remembers how far we have processed.
+
s"""
| // initialize Range
| if (!$initTerm) {
| $initTerm = true;
| $initRangeFuncName(partitionIndex);
| }
|
- | while (true) {
- | long $range = $batchEnd - $number;
+ | while ($loopCondition) {
+ | long $range = $batchEnd - $nextIndex;
| if ($range != 0L) {
| int $localEnd = (int)($range / ${step}L);
- | for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
- | long $value = ((long)$localIdx * ${step}L) + $number;
- | ${consume(ctx, Seq(ev))}
- | $shouldStop
+ | $processingLoop
+ | } else {
+ | long $nextBatchTodo;
--- End diff --
good idea!
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22621: [SPARK-25602][SQL] SparkPlan.getByteArrayRdd should not ...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22621
**[Test build #96920 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96920/testReport)** for PR 22621 at commit [`1c94d13`](https://github.com/apache/spark/commit/1c94d13895f4537111518e5b26075395bcc250b0).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org