You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Ron Serruya (Jira)" <ji...@apache.org> on 2023/10/15 16:23:00 UTC

[jira] [Created] (SPARK-45543) WindowGroupLimit Causes incorrect results when multiple windows are used

Ron Serruya created SPARK-45543:
-----------------------------------

             Summary: WindowGroupLimit Causes incorrect results when multiple windows are used
                 Key: SPARK-45543
                 URL: https://issues.apache.org/jira/browse/SPARK-45543
             Project: Spark
          Issue Type: Bug
          Components: Optimizer, Spark Core, SQL
    Affects Versions: 3.5.0
            Reporter: Ron Serruya


First, it's my first bug, so I'm hoping I'm doing it right, also, as I'm not very knowledgeable about spark internals, I hope I diagnosed the problem correctly

I found the degradation in spark version 3.5.0:

When using multiple windows that share the same partition and ordering (but with different "frame boundaries", where one window is a ranking function, "WindowGroupLimit" is added to the plan causing wrong values to be created from the other windows.

*This behavior didn't exist in versions 3.3 and 3.4.*

Example:

 
{code:python}
import pysparkfrom pyspark.sql import functions as F, Window  

df = spark.createDataFrame([
    {'row_id': 1, 'name': 'Dave', 'score': 1, 'year': 2020},
    {'row_id': 1, 'name': 'Dave', 'score': 2, 'year': 2022},
    {'row_id': 1, 'name': 'Dave', 'score': 3, 'year': 2023},
    {'row_id': 2, 'name': 'Amy', 'score': 6, 'year': 2021},
])

# Create first window for row number
window_spec = Window.partitionBy('row_id', 'name').orderBy(F.desc('year'))

# Create additional window from the first window with unbounded frame
unbound_spec = window_spec.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

# Try to collect keep the first row by year, and also collect all scores into a list
df2 = df.withColumn(
    'rn', 
    F.row_number().over(window_spec)
).withColumn(
    'all_scores', 
    F.collect_list('score').over(unbound_spec)
){code}
So far everything works, and if we display df2:

 
{noformat}
+----+------+-----+----+---+----------+
|name|row_id|score|year|rn |all_scores|
+----+------+-----+----+---+----------+
|Dave|1     |3    |2023|1  |[3, 2, 1] |
|Dave|1     |2    |2022|2  |[3, 2, 1] |
|Dave|1     |1    |2020|3  |[3, 2, 1] |
|Amy |2     |6    |2021|1  |[6]       |
+----+------+-----+----+---+----------+{noformat}
 

However, once we filter to keep only the first row number:

 
{noformat}
df2.filter("rn=1").show(truncate=False)
+----+------+-----+----+---+----------+
|name|row_id|score|year|rn |all_scores|
+----+------+-----+----+---+----------+
|Dave|1     |3    |2023|1  |[3]       |
|Amy |2     |6    |2021|1  |[6]       |
+----+------+-----+----+---+----------+{noformat}
As you can see just filtering changed the "all_scores" array for Dave.

(This example uses `collect_list`, however, the same result happens with other functions, such as max, min, count, etc)

 

Now, if instead of using the two windows we used, I will use the first window and a window with different ordering, or create a completely new window with same partition but no ordering, it will work fine:
{code:python}
new_window = Window.partitionBy('row_id', 'name').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df3 = df.withColumn(
    'rn',
    F.row_number().over(window_spec)
).withColumn(
    'all_scores',
    F.collect_list('score').over(new_window)
)
df3.filter("rn=1").show(truncate=False){code}
{noformat}
+----+------+-----+----+---+----------+
|name|row_id|score|year|rn |all_scores|
+----+------+-----+----+---+----------+
|Dave|1     |3    |2023|1  |[3, 2, 1] |
|Amy |2     |6    |2021|1  |[6]       |
+----+------+-----+----+---+----------+
{noformat}
In addition, if we use all 3 windows to create 3 different columns, it will also work ok. So it seems the issue happens only when all the windows used share the same partition and ordering.

Here is the final plan for the faulty dataframe:
{noformat}
df2.filter("rn=1").explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Filter (rn#9 = 1)
   +- Window [row_number() windowspecdefinition(row_id#1L, name#0, year#3L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#9, collect_list(score#2L, 0, 0) windowspecdefinition(row_id#1L, name#0, year#3L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS all_scores#16], [row_id#1L, name#0], [year#3L DESC NULLS LAST]
      +- WindowGroupLimit [row_id#1L, name#0], [year#3L DESC NULLS LAST], row_number(), 1, Final
         +- Sort [row_id#1L ASC NULLS FIRST, name#0 ASC NULLS FIRST, year#3L DESC NULLS LAST], false, 0
            +- Exchange hashpartitioning(row_id#1L, name#0, 200), ENSURE_REQUIREMENTS, [plan_id=425]
               +- WindowGroupLimit [row_id#1L, name#0], [year#3L DESC NULLS LAST], row_number(), 1, Partial
                  +- Sort [row_id#1L ASC NULLS FIRST, name#0 ASC NULLS FIRST, year#3L DESC NULLS LAST], false, 0
                     +- Scan ExistingRDD[name#0,row_id#1L,score#2L,year#3L]{noformat}
I suspect the issue is caused due to the "WindowGroupLimit" clause in the plan.

This clause doesn't appear in the dataframes that work fine, and before filtering the rn.

So I assume that since the optimizer detects that we want to only keep the first row of the ranking function, it first removes all other rows from the following computations, which causes the incorrect result or loss of data.

I think the bug comes from this change (and the attached PRs):

https://issues.apache.org/jira/browse/SPARK-44340

It was added in spark 3.5.0, and in addition, I noticed that it was included in databricks release 13.3, which included spark 3.4.0, but also this fix in their release note. And evidently, this bug happens on databricks13 spark3.4, but not on my local spark 3.4

tagging user [~beliefer] as I believe you would know most about this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org