You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Herman van Hovell (JIRA)" <ji...@apache.org> on 2016/06/22 01:42:57 UTC

[jira] [Closed] (SPARK-15326) Doing multiple unions on a Dataframe will result in a very inefficient query plan

     [ https://issues.apache.org/jira/browse/SPARK-15326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Herman van Hovell closed SPARK-15326.
-------------------------------------
    Resolution: Not A Problem

> Doing multiple unions on a Dataframe will result in a very inefficient query plan
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-15326
>                 URL: https://issues.apache.org/jira/browse/SPARK-15326
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.1, 2.0.0
>            Reporter: Jurriaan Pruis
>         Attachments: Query Plan.pdf, skewed_join.py, skewed_join_plan.txt
>
>
> While working with a very skewed dataset I noticed that repeated unions on a dataframe will result in a query plan with 2^(union) - 1 unions. With large datasets this will be very inefficient.
> I tried to replicate this behaviour using a PySpark example (I've attached the output of the explain() to this JIRA):
> {code}
> df = sqlCtx.range(10000000)
> def r(name, max_val=100):
>     return F.round(F.lit(max_val) * F.pow(F.rand(), 4)).cast('integer').alias(name)
> # Create a skewed dataset
> skewed = df.select('id', r('a'), r('b'), r('c'), r('d'), r('e'), r('f'))
> # Find the skewed values in the dataset
> top_10_percent = skewed.freqItems(['a', 'b', 'c', 'd', 'e', 'f'], 0.10).collect()[0]
> def skewjoin(skewed, right, column, freqItems):
>     freqItems = freqItems[column + '_freqItems']
>     skewed = skewed.alias('skewed')
>     cond = F.col(column).isin(freqItems)
>     # First broadcast join the frequent (skewed) values
>     filtered = skewed.filter(cond).join(F.broadcast(right.filter(cond)), column, 'left_outer')
>     # Use a regular join for the non skewed values (with big tables this will use a SortMergeJoin)
>     non_skewed = skewed.filter(cond == False).join(right.filter(cond == False), column, 'left_outer')
>     # join them together and replace the column with the column found in the right DataFrame
>     return filtered.unionAll(non_skewed).select('skewed.*', right['id'].alias(column + '_key')).drop(column)
> # Create the dataframes that will be joined to the skewed dataframe
> right_size = 100
> df_a = sqlCtx.range(right_size).select('id', F.col('id').alias('a'))
> df_b = sqlCtx.range(right_size).select('id', F.col('id').alias('b'))
> df_c = sqlCtx.range(right_size).select('id', F.col('id').alias('c'))
> df_d = sqlCtx.range(right_size).select('id', F.col('id').alias('d'))
> df_e = sqlCtx.range(right_size).select('id', F.col('id').alias('e'))
> df_f = sqlCtx.range(right_size).select('id', F.col('id').alias('f'))
> # Join everything together
> df = skewed
> df = skewjoin(df, df_a, 'a', top_10_percent)
> df = skewjoin(df, df_b, 'b', top_10_percent)
> df = skewjoin(df, df_c, 'c', top_10_percent)
> df = skewjoin(df, df_d, 'd', top_10_percent)
> df = skewjoin(df, df_e, 'e', top_10_percent)
> df = skewjoin(df, df_f, 'f', top_10_percent)
> # df.explain() shows the plan where it does 63 unions (2^(number_of_skewjoins) - 1)
> # which will be very inefficient and slow
> df.explain(True)
> # Evaluate the plan
> # You'd expect this to return 10000000, but it does not, it returned 10000140 on my system
> # (probably because it will recalculate the random columns? Not sure though)
> print(df.count())
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org