You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Franz (Jira)" <ji...@apache.org> on 2020/01/17 14:19:00 UTC

[jira] [Created] (SPARK-30552) Chained spark column expressions with distinct windows specs produce inefficient DAG

Franz created SPARK-30552:
-----------------------------

             Summary: Chained spark column expressions with distinct windows specs produce inefficient DAG
                 Key: SPARK-30552
                 URL: https://issues.apache.org/jira/browse/SPARK-30552
             Project: Spark
          Issue Type: Bug
          Components: PySpark, Spark Core
    Affects Versions: 2.4.4
         Environment: INSTALLED VERSIONS
------------------
commit : None
python : 3.6.9.final.0
python-bits : 64
OS : Windows
OS-release : 10
machine : AMD64
processor : Intel64 Family 6 Model 158 Stepping 10, GenuineIntel
byteorder : little
LC_ALL : None
LANG : de_DE.UTF-8
LOCALE : None.None

pandas : 0.25.3
numpy : 1.17.4
pytz : 2019.3
dateutil : 2.8.1
pip : 19.3.1
setuptools : 41.6.0.post20191030
Cython : None
pytest : 5.3.0
hypothesis : None
sphinx : 2.2.1
blosc : None
feather : None
xlsxwriter : None
lxml.etree : None
html5lib : None
pymysql : None
psycopg2 : None
jinja2 : 2.10.3
IPython : 7.11.1
pandas_datareader: None
bs4 : None
bottleneck : None
fastparquet : None
gcsfs : None
lxml.etree : None
matplotlib : None
numexpr : None
odfpy : None
openpyxl : None
pandas_gbq : None
pyarrow : 0.15.1
pytables : None
s3fs : None
scipy : None
sqlalchemy : None
tables : None
xarray : None
xlrd : None
xlwt : None
xlsxwriter : None
            Reporter: Franz


h2.  Context
Let's say you deal with time series data. Your desired outcome relies on multiple window functions with distinct window specifications. The result may resemble a single spark column expression, like an identifier for intervals.

h2. Status Quo
Usually, I don't store intermediate results with `df.withColumn` but rather chain/stack column expressions and trust Spark to find the most effective DAG (when dealing with DataFrame).

h2. Reproducible example
However, in the following example (PySpark 2.4.4 standalone), storing an intermediate result with `df.withColumn` reduces the DAG complexity. Let's consider following test setup:

{code:python}
import pandas as pd
import numpy as np

from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

dfp = pd.DataFrame(
    {
        "col1": np.random.randint(0, 5, size=100),
        "col2": np.random.randint(0, 5, size=100),
        "col3": np.random.randint(0, 5, size=100),
        "col4": np.random.randint(0, 5, size=100),        
    }
)

df = spark.createDataFrame(dfp)
df.show(5)


+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   2|   4|   1|
|   0|   2|   3|   0|
|   2|   0|   1|   0|
|   4|   1|   1|   2|
|   1|   3|   0|   4|
+----+----+----+----+
only showing top 5 rows
{code}



The computation is arbitrary. Basically we have 2 window specs and 3 computational steps. The 3 computational steps are dependend on each other and use alternating window specs:


{code:python}
w1 = Window.partitionBy("col1").orderBy("col2")
w2 = Window.partitionBy("col3").orderBy("col4")

# first step, arbitrary window func over 1st window
step1 = F.lag("col3").over(w1)

# second step, arbitrary window func over 2nd window with step 1
step2 = F.lag(step1).over(w2)

# third step, arbitrary window func over 1st window with step 2
step3 = F.when(step2 > 1, F.max(step2).over(w1))

df_result = df.withColumn("result", step3)
{code}


Inspecting the phyiscal plan via `df_result.explain()` reveals 4 exchanges and sorts! However, only 3 should be necessary here because we change the window spec only twice. 

{code:python}
df_result.explain()

== Physical Plan ==
*(7) Project [col1#0L, col2#1L, col3#2L, col4#3L, CASE WHEN (_we0#25L > 1) THEN _we1#26L END AS result#22L]
+- Window [lag(_w0#23L, 1, null) windowspecdefinition(col3#2L, col4#3L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we0#25L], [col3#2L], [col4#3L ASC NULLS FIRST]
   +- *(6) Sort [col3#2L ASC NULLS FIRST, col4#3L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(col3#2L, 200)
         +- *(5) Project [col1#0L, col2#1L, col3#2L, col4#3L, _w0#23L, _we1#26L]
            +- Window [max(_w1#24L) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS _we1#26L], [col1#0L], [col2#1L ASC NULLS FIRST]
               +- *(4) Sort [col1#0L ASC NULLS FIRST, col2#1L ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(col1#0L, 200)
                     +- *(3) Project [col1#0L, col2#1L, col3#2L, col4#3L, _w0#23L, _w1#24L]
                        +- Window [lag(_w0#27L, 1, null) windowspecdefinition(col3#2L, col4#3L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _w1#24L], [col3#2L], [col4#3L ASC NULLS FIRST]
                           +- *(2) Sort [col3#2L ASC NULLS FIRST, col4#3L ASC NULLS FIRST], false, 0
                              +- Exchange hashpartitioning(col3#2L, 200)
                                 +- Window [lag(col3#2L, 1, null) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _w0#27L, lag(col3#2L, 1, null) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _w0#23L], [col1#0L], [col2#1L ASC NULLS FIRST]
                                    +- *(1) Sort [col1#0L ASC NULLS FIRST, col2#1L ASC NULLS FIRST], false, 0
                                       +- Exchange hashpartitioning(col1#0L, 200)
                                          +- Scan ExistingRDD[col1#0L,col2#1L,col3#2L,col4#3L]
{code}



h2. Improvement
To get a better DAG, we slightly modify the code to store the column expression of `step2` with `withColumn` and just pass the reference of this column. The new logical plan requires only 3 shuffles indeed! 


{code:python}
w1 = Window.partitionBy("col1").orderBy("col2")
w2 = Window.partitionBy("col3").orderBy("col4")

# first step, arbitrary window func
step1 = F.lag("col3").over(w1)

# second step, arbitrary window func over 2nd window with step 1
step2 = F.lag(step1).over(w2)

# save temporary
df = df.withColumn("tmp_variable", step2)
step2 = F.col("tmp_variable")

# third step, arbitrary window func over 1st window with step 2
step3 = F.when(step2 > 1, F.max(step2).over(w1))

df_result = df.withColumn("result", step3).drop("tmp_variable")
df_result.explain()

== Physical Plan ==
*(5) Project [col1#0L, col2#1L, col3#2L, col4#3L, CASE WHEN (tmp_variable#33L > 1) THEN _we0#42L END AS result#41L]
+- Window [max(tmp_variable#33L) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS _we0#42L], [col1#0L], [col2#1L ASC NULLS FIRST]
   +- *(4) Sort [col1#0L ASC NULLS FIRST, col2#1L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(col1#0L, 200)
         +- *(3) Project [col1#0L, col2#1L, col3#2L, col4#3L, tmp_variable#33L]
            +- Window [lag(_w0#34L, 1, null) windowspecdefinition(col3#2L, col4#3L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS tmp_variable#33L], [col3#2L], [col4#3L ASC NULLS FIRST]
               +- *(2) Sort [col3#2L ASC NULLS FIRST, col4#3L ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(col3#2L, 200)
                     +- Window [lag(col3#2L, 1, null) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _w0#34L], [col1#0L], [col2#1L ASC NULLS FIRST]
                        +- *(1) Sort [col1#0L ASC NULLS FIRST, col2#1L ASC NULLS FIRST], false, 0
                           +- Exchange hashpartitioning(col1#0L, 200)
                              +- Scan ExistingRDD[col1#0L,col2#1L,col3#2L,col4#3L]
{code}



h2. Relevance
My original example was even more complex and resulted in an even greater difference of the DAG (on real world data up to 10 times slower)

h2. Question
Does anyone have an answer to this odd behavior? I've thought that stacking/chaining column expressions is best practice since it allows Spark to optimize intermediate steps most effectively (in contrast to create references for intermediate results). 

Thanks in advance.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org