You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jiaan Geng (Jira)" <ji...@apache.org> on 2023/10/16 13:35:00 UTC

[jira] [Updated] (SPARK-45543) WindowGroupLimit causes incorrect results if window expressions exists non-rank window function

     [ https://issues.apache.org/jira/browse/SPARK-45543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jiaan Geng updated SPARK-45543:
-------------------------------
    Summary: WindowGroupLimit causes incorrect results if window expressions exists non-rank window function  (was: WindowGroupLimit Causes incorrect results when window expressions exists non-rank function)

> WindowGroupLimit causes incorrect results if window expressions exists non-rank window function
> -----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-45543
>                 URL: https://issues.apache.org/jira/browse/SPARK-45543
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer, Spark Core, SQL
>    Affects Versions: 3.5.0
>            Reporter: Ron Serruya
>            Assignee: Jiaan Geng
>            Priority: Critical
>              Labels: correctness, data-loss
>
> First, it's my first bug, so I'm hoping I'm doing it right, also, as I'm not very knowledgeable about spark internals, I hope I diagnosed the problem correctly
> I found the degradation in spark version 3.5.0:
> When using multiple windows that share the same partition and ordering (but with different "frame boundaries", where one window is a ranking function, "WindowGroupLimit" is added to the plan causing wrong values to be created from the other windows.
> *This behavior didn't exist in versions 3.3 and 3.4.*
> Example:
>  
> {code:python}
> import pysparkfrom pyspark.sql import functions as F, Window  
> df = spark.createDataFrame([
>     {'row_id': 1, 'name': 'Dave', 'score': 1, 'year': 2020},
>     {'row_id': 1, 'name': 'Dave', 'score': 2, 'year': 2022},
>     {'row_id': 1, 'name': 'Dave', 'score': 3, 'year': 2023},
>     {'row_id': 2, 'name': 'Amy', 'score': 6, 'year': 2021},
> ])
> # Create first window for row number
> window_spec = Window.partitionBy('row_id', 'name').orderBy(F.desc('year'))
> # Create additional window from the first window with unbounded frame
> unbound_spec = window_spec.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
> # Try to keep the first row by year, and also collect all scores into a list
> df2 = df.withColumn(
>     'rn', 
>     F.row_number().over(window_spec)
> ).withColumn(
>     'all_scores', 
>     F.collect_list('score').over(unbound_spec)
> ){code}
> So far everything works, and if we display df2:
>  
> {noformat}
> +----+------+-----+----+---+----------+
> |name|row_id|score|year|rn |all_scores|
> +----+------+-----+----+---+----------+
> |Dave|1     |3    |2023|1  |[3, 2, 1] |
> |Dave|1     |2    |2022|2  |[3, 2, 1] |
> |Dave|1     |1    |2020|3  |[3, 2, 1] |
> |Amy |2     |6    |2021|1  |[6]       |
> +----+------+-----+----+---+----------+{noformat}
>  
> However, once we filter to keep only the first row number:
>  
> {noformat}
> df2.filter("rn=1").show(truncate=False)
> +----+------+-----+----+---+----------+
> |name|row_id|score|year|rn |all_scores|
> +----+------+-----+----+---+----------+
> |Dave|1     |3    |2023|1  |[3]       |
> |Amy |2     |6    |2021|1  |[6]       |
> +----+------+-----+----+---+----------+{noformat}
> As you can see just filtering changed the "all_scores" array for Dave.
> (This example uses `collect_list`, however, the same result happens with other functions, such as max, min, count, etc)
>  
> Now, if instead of using the two windows we used, I will use the first window and a window with different ordering, or create a completely new window with same partition but no ordering, it will work fine:
> {code:python}
> new_window = Window.partitionBy('row_id', 'name').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
> df3 = df.withColumn(
>     'rn',
>     F.row_number().over(window_spec)
> ).withColumn(
>     'all_scores',
>     F.collect_list('score').over(new_window)
> )
> df3.filter("rn=1").show(truncate=False){code}
> {noformat}
> +----+------+-----+----+---+----------+
> |name|row_id|score|year|rn |all_scores|
> +----+------+-----+----+---+----------+
> |Dave|1     |3    |2023|1  |[3, 2, 1] |
> |Amy |2     |6    |2021|1  |[6]       |
> +----+------+-----+----+---+----------+
> {noformat}
> In addition, if we use all 3 windows to create 3 different columns, it will also work ok. So it seems the issue happens only when all the windows used share the same partition and ordering.
> Here is the final plan for the faulty dataframe:
> {noformat}
> df2.filter("rn=1").explain()
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Filter (rn#9 = 1)
>    +- Window [row_number() windowspecdefinition(row_id#1L, name#0, year#3L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#9, collect_list(score#2L, 0, 0) windowspecdefinition(row_id#1L, name#0, year#3L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS all_scores#16], [row_id#1L, name#0], [year#3L DESC NULLS LAST]
>       +- WindowGroupLimit [row_id#1L, name#0], [year#3L DESC NULLS LAST], row_number(), 1, Final
>          +- Sort [row_id#1L ASC NULLS FIRST, name#0 ASC NULLS FIRST, year#3L DESC NULLS LAST], false, 0
>             +- Exchange hashpartitioning(row_id#1L, name#0, 200), ENSURE_REQUIREMENTS, [plan_id=425]
>                +- WindowGroupLimit [row_id#1L, name#0], [year#3L DESC NULLS LAST], row_number(), 1, Partial
>                   +- Sort [row_id#1L ASC NULLS FIRST, name#0 ASC NULLS FIRST, year#3L DESC NULLS LAST], false, 0
>                      +- Scan ExistingRDD[name#0,row_id#1L,score#2L,year#3L]{noformat}
> I suspect the issue is caused due to the "WindowGroupLimit" clause in the plan.
> This clause doesn't appear in the dataframes that work fine, and before filtering the rn.
> So I assume that since the optimizer detects that we want to only keep the first row of the ranking function, it first removes all other rows from the following computations, which causes the incorrect result or loss of data.
> I think the bug comes from this change (and the attached PRs):
> https://issues.apache.org/jira/browse/SPARK-44340
> It was added in spark 3.5.0, and in addition, I noticed that it was included in databricks release 13.3, which included spark 3.4.0, but also this fix in their release note. And evidently, this bug happens on databricks13 spark3.4, but not on my local spark 3.4
> tagging user [~beliefer] as I believe you would know most about this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org