You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2020/02/17 19:53:12 UTC
[spark] branch branch-3.0 updated: [SPARK-30806][SQL] Evaluate once
per group in UnboundedWindowFunctionFrame
This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new d6dcee4 [SPARK-30806][SQL] Evaluate once per group in UnboundedWindowFunctionFrame
d6dcee4 is described below
commit d6dcee487d4d353eea78ee567c256025b6bb0eff
Author: wangguangxin.cn <wa...@gmail.com>
AuthorDate: Mon Feb 17 18:15:54 2020 +0100
[SPARK-30806][SQL] Evaluate once per group in UnboundedWindowFunctionFrame
### What changes were proposed in this pull request?
We only need to do aggregate evaluation once per group in `UnboundedWindowFunctionFrame`
### Why are the changes needed?
Currently, in `UnboundedWindowFunctionFrame.write`,it re-evaluate the processor for each row in a group, which is not necessary in fact which I'll address later. It hurts performance when the evaluation is time-consuming (for example, Percentile's eval need to sort its buffer and do some calculation). In our production, there is a percentile with window operation sql, it costs more than 10 hours in SparkSQL while 10min in Hive.
In fact, `UnboundedWindowFunctionFrame` can be treated as `SlidingWindowFunctionFrame` with `lbound = UnboundedPreceding` and `ubound = UnboundedFollowing`, just as its comments. In that case, `SlidingWindowFunctionFrame` also only do evaluation once for each group.
The performance issue can be reproduced by running the follow scripts in local spark-shell
```
spark.range(100*100).map(i => (i, "India")).toDF("uv", "country").createOrReplaceTempView("test")
sql("select uv, country, percentile(uv, 0.95) over (partition by country) as ptc95 from test").collect.foreach(println)
```
Before this patch, the sql costs **128048 ms**.
With this patch, the sql costs **3485 ms**.
If we increase the data size to 1000*1000 for example, then spark cannot even produce result without this patch(I'v waited for several hours).
### Does this PR introduce any user-facing change?
NO
### How was this patch tested?
Existing UT
Closes #27558 from WangGuangxin/windows.
Authored-by: wangguangxin.cn <wa...@gmail.com>
Signed-off-by: herman <he...@databricks.com>
---
.../apache/spark/sql/execution/window/WindowFunctionFrame.scala | 9 ++++-----
1 file changed, 4 insertions(+), 5 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
index d5f2ffa..181ff42 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
@@ -277,6 +277,8 @@ final class UnboundedWindowFunctionFrame(
while (iterator.hasNext) {
processor.update(iterator.next())
}
+
+ processor.evaluate(target)
}
upperBound = rows.length
@@ -284,11 +286,8 @@ final class UnboundedWindowFunctionFrame(
/** Write the frame columns for the current row to the given target row. */
override def write(index: Int, current: InternalRow): Unit = {
- // Unfortunately we cannot assume that evaluation is deterministic. So we need to re-evaluate
- // for each row.
- if (processor != null) {
- processor.evaluate(target)
- }
+ // The results are the same for each row in the partition, and have been evaluated in prepare.
+ // Don't need to recalculate here.
}
override def currentLowerBound(): Int = lowerBound
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org