You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dovi Joel (Jira)" <ji...@apache.org> on 2020/08/13 07:21:00 UTC

[jira] [Commented] (SPARK-30552) Chained spark column expressions with distinct windows specs produce inefficient DAG

    [ https://issues.apache.org/jira/browse/SPARK-30552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176803#comment-17176803 ] 

Dovi Joel commented on SPARK-30552:
-----------------------------------

I have also been experiencing this issue, in Spark 3.0.0. Is there anywhere I can look to try and resolve this?

> Chained spark column expressions with distinct windows specs produce inefficient DAG
> ------------------------------------------------------------------------------------
>
>                 Key: SPARK-30552
>                 URL: https://issues.apache.org/jira/browse/SPARK-30552
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Spark Core
>    Affects Versions: 2.4.4
>         Environment: python : 3.6.9.final.0
>  python-bits : 64
>  OS : Windows
>  OS-release : 10
>  machine : AMD64
>  processor : Intel64 Family 6 Model 158 Stepping 10, GenuineIntel
> pyspark: 2.4.4
> pandas : 0.25.3
>  numpy : 1.17.4
> pyarrow : 0.15.1
>            Reporter: Franz
>            Priority: Major
>
> h2.  Context
> Let's say you deal with time series data. Your desired outcome relies on multiple window functions with distinct window specifications. The result may resemble a single spark column expression, like an identifier for intervals.
> h2. Status Quo
> Usually, I don't store intermediate results with `df.withColumn` but rather chain/stack column expressions and trust Spark to find the most effective DAG (when dealing with DataFrame).
> h2. Reproducible example
> However, in the following example (PySpark 2.4.4 standalone), storing an intermediate result with `df.withColumn` reduces the DAG complexity. Let's consider following test setup:
> {code:python}
> import pandas as pd
> import numpy as np
> from pyspark.sql import SparkSession, Window
> from pyspark.sql import functions as F
> spark = SparkSession.builder.getOrCreate()
> dfp = pd.DataFrame(
>     {
>         "col1": np.random.randint(0, 5, size=100),
>         "col2": np.random.randint(0, 5, size=100),
>         "col3": np.random.randint(0, 5, size=100),
>         "col4": np.random.randint(0, 5, size=100),        
>     }
> )
> df = spark.createDataFrame(dfp)
> df.show(5)
> +----+----+----+----+
> |col1|col2|col3|col4|
> +----+----+----+----+
> |   1|   2|   4|   1|
> |   0|   2|   3|   0|
> |   2|   0|   1|   0|
> |   4|   1|   1|   2|
> |   1|   3|   0|   4|
> +----+----+----+----+
> only showing top 5 rows
> {code}
> The computation is arbitrary. Basically we have 2 window specs and 3 computational steps. The 3 computational steps are dependend on each other and use alternating window specs:
> {code:python}
> w1 = Window.partitionBy("col1").orderBy("col2")
> w2 = Window.partitionBy("col3").orderBy("col4")
> # first step, arbitrary window func over 1st window
> step1 = F.lag("col3").over(w1)
> # second step, arbitrary window func over 2nd window with step 1
> step2 = F.lag(step1).over(w2)
> # third step, arbitrary window func over 1st window with step 2
> step3 = F.when(step2 > 1, F.max(step2).over(w1))
> df_result = df.withColumn("result", step3)
> {code}
> Inspecting the phyiscal plan via `df_result.explain()` reveals 4 exchanges and sorts! However, only 3 should be necessary here because we change the window spec only twice. 
> {code:python}
> df_result.explain()
> == Physical Plan ==
> *(7) Project [col1#0L, col2#1L, col3#2L, col4#3L, CASE WHEN (_we0#25L > 1) THEN _we1#26L END AS result#22L]
> +- Window [lag(_w0#23L, 1, null) windowspecdefinition(col3#2L, col4#3L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we0#25L], [col3#2L], [col4#3L ASC NULLS FIRST]
>    +- *(6) Sort [col3#2L ASC NULLS FIRST, col4#3L ASC NULLS FIRST], false, 0
>       +- Exchange hashpartitioning(col3#2L, 200)
>          +- *(5) Project [col1#0L, col2#1L, col3#2L, col4#3L, _w0#23L, _we1#26L]
>             +- Window [max(_w1#24L) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS _we1#26L], [col1#0L], [col2#1L ASC NULLS FIRST]
>                +- *(4) Sort [col1#0L ASC NULLS FIRST, col2#1L ASC NULLS FIRST], false, 0
>                   +- Exchange hashpartitioning(col1#0L, 200)
>                      +- *(3) Project [col1#0L, col2#1L, col3#2L, col4#3L, _w0#23L, _w1#24L]
>                         +- Window [lag(_w0#27L, 1, null) windowspecdefinition(col3#2L, col4#3L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _w1#24L], [col3#2L], [col4#3L ASC NULLS FIRST]
>                            +- *(2) Sort [col3#2L ASC NULLS FIRST, col4#3L ASC NULLS FIRST], false, 0
>                               +- Exchange hashpartitioning(col3#2L, 200)
>                                  +- Window [lag(col3#2L, 1, null) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _w0#27L, lag(col3#2L, 1, null) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _w0#23L], [col1#0L], [col2#1L ASC NULLS FIRST]
>                                     +- *(1) Sort [col1#0L ASC NULLS FIRST, col2#1L ASC NULLS FIRST], false, 0
>                                        +- Exchange hashpartitioning(col1#0L, 200)
>                                           +- Scan ExistingRDD[col1#0L,col2#1L,col3#2L,col4#3L]
> {code}
> h2. Improvement
> To get a better DAG, we slightly modify the code to store the column expression of `step2` with `withColumn` and just pass the reference of this column. The new logical plan requires only 3 shuffles indeed! 
> {code:python}
> w1 = Window.partitionBy("col1").orderBy("col2")
> w2 = Window.partitionBy("col3").orderBy("col4")
> # first step, arbitrary window func
> step1 = F.lag("col3").over(w1)
> # second step, arbitrary window func over 2nd window with step 1
> step2 = F.lag(step1).over(w2)
> # save temporary
> df = df.withColumn("tmp_variable", step2)
> step2 = F.col("tmp_variable")
> # third step, arbitrary window func over 1st window with step 2
> step3 = F.when(step2 > 1, F.max(step2).over(w1))
> df_result = df.withColumn("result", step3).drop("tmp_variable")
> df_result.explain()
> == Physical Plan ==
> *(5) Project [col1#0L, col2#1L, col3#2L, col4#3L, CASE WHEN (tmp_variable#33L > 1) THEN _we0#42L END AS result#41L]
> +- Window [max(tmp_variable#33L) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS _we0#42L], [col1#0L], [col2#1L ASC NULLS FIRST]
>    +- *(4) Sort [col1#0L ASC NULLS FIRST, col2#1L ASC NULLS FIRST], false, 0
>       +- Exchange hashpartitioning(col1#0L, 200)
>          +- *(3) Project [col1#0L, col2#1L, col3#2L, col4#3L, tmp_variable#33L]
>             +- Window [lag(_w0#34L, 1, null) windowspecdefinition(col3#2L, col4#3L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS tmp_variable#33L], [col3#2L], [col4#3L ASC NULLS FIRST]
>                +- *(2) Sort [col3#2L ASC NULLS FIRST, col4#3L ASC NULLS FIRST], false, 0
>                   +- Exchange hashpartitioning(col3#2L, 200)
>                      +- Window [lag(col3#2L, 1, null) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _w0#34L], [col1#0L], [col2#1L ASC NULLS FIRST]
>                         +- *(1) Sort [col1#0L ASC NULLS FIRST, col2#1L ASC NULLS FIRST], false, 0
>                            +- Exchange hashpartitioning(col1#0L, 200)
>                               +- Scan ExistingRDD[col1#0L,col2#1L,col3#2L,col4#3L]
> {code}
> h2. Relevance
> My original example was even more complex and resulted in an even greater difference of the DAG (on real world data up to 10 times slower)
> h2. Question
> Does anyone have an answer to this odd behavior? I've thought that stacking/chaining column expressions is best practice since it allows Spark to optimize intermediate steps most effectively (in contrast to create references for intermediate results). 
> Thanks in advance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org