You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2020/02/17 19:53:12 UTC

[spark] branch branch-3.0 updated: [SPARK-30806][SQL] Evaluate once per group in UnboundedWindowFunctionFrame

This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d6dcee4  [SPARK-30806][SQL] Evaluate once per group in UnboundedWindowFunctionFrame
d6dcee4 is described below

commit d6dcee487d4d353eea78ee567c256025b6bb0eff
Author: wangguangxin.cn <wa...@gmail.com>
AuthorDate: Mon Feb 17 18:15:54 2020 +0100

    [SPARK-30806][SQL] Evaluate once per group in UnboundedWindowFunctionFrame
    
    ### What changes were proposed in this pull request?
    We only need to do aggregate evaluation once per group in `UnboundedWindowFunctionFrame`
    
    ### Why are the changes needed?
    Currently, in `UnboundedWindowFunctionFrame.write`,it re-evaluate the processor for each row in a group, which is not necessary in fact which I'll address later. It hurts performance when the evaluation is time-consuming (for example, Percentile's eval need to sort its buffer and do some calculation). In our production, there is a percentile with window operation sql,  it costs more than 10 hours in SparkSQL while 10min in Hive.
    
    In fact, `UnboundedWindowFunctionFrame` can be treated as `SlidingWindowFunctionFrame` with `lbound = UnboundedPreceding` and `ubound = UnboundedFollowing`, just as its comments. In that case, `SlidingWindowFunctionFrame` also only do evaluation once for each group.
    
    The performance issue can be reproduced by running the follow scripts in local spark-shell
    ```
    spark.range(100*100).map(i => (i, "India")).toDF("uv", "country").createOrReplaceTempView("test")
    sql("select uv, country, percentile(uv, 0.95) over (partition by country) as ptc95 from test").collect.foreach(println)
    ```
    Before this patch, the sql costs **128048 ms**.
    With this patch,  the sql costs **3485 ms**.
    
    If we increase the data size to 1000*1000 for example, then spark cannot even produce result without this patch(I'v waited for several hours).
    
    ### Does this PR introduce any user-facing change?
    NO
    
    ### How was this patch tested?
    Existing UT
    
    Closes #27558 from WangGuangxin/windows.
    
    Authored-by: wangguangxin.cn <wa...@gmail.com>
    Signed-off-by: herman <he...@databricks.com>
---
 .../apache/spark/sql/execution/window/WindowFunctionFrame.scala  | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
index d5f2ffa..181ff42 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
@@ -277,6 +277,8 @@ final class UnboundedWindowFunctionFrame(
       while (iterator.hasNext) {
         processor.update(iterator.next())
       }
+
+      processor.evaluate(target)
     }
 
     upperBound = rows.length
@@ -284,11 +286,8 @@ final class UnboundedWindowFunctionFrame(
 
   /** Write the frame columns for the current row to the given target row. */
   override def write(index: Int, current: InternalRow): Unit = {
-    // Unfortunately we cannot assume that evaluation is deterministic. So we need to re-evaluate
-    // for each row.
-    if (processor != null) {
-      processor.evaluate(target)
-    }
+    // The results are the same for each row in the partition, and have been evaluated in prepare.
+    // Don't need to recalculate here.
   }
 
   override def currentLowerBound(): Int = lowerBound


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org