You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Artem Bergkamp (Jira)" <ji...@apache.org> on 2020/10/20 08:09:00 UTC

[jira] [Updated] (SPARK-28771) Join partitioned dataframes on superset of partitioning columns without shuffle

     [ https://issues.apache.org/jira/browse/SPARK-28771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Artem Bergkamp updated SPARK-28771:
-----------------------------------
    Description: 
Hi Spark developers.

Few months ago I asked this question at [stackoverflow|https://stackoverflow.com/questions/55229290/question-about-joining-dataframes-in-spark] but could not get usable solution.
 Only one valid suggestion was to implement it via catalyst optimizer extensions but this is not something that an ordinary user can do.
 I decided to raise improvement request since think such functionality should be available out of the box.

Suppose I have two partitioned dataframes:
{code:java}
df1 = spark.createDataFrame(
    [(1,1,1), (2,2,2)], ['key1', 'key2', 'time']
).repartition(3, 'key1', 'key2')
df2 = spark.createDataFrame(
    [(1,1,1)], ['key1', 'key2', 'time']
).repartition(3, 'key1', 'key2')
{code}
*(scenario 1)* If I join them by [key1, key2] join operation is performed within each partition without shuffle (number of partitions in result dataframe is the same):
{code:java}
x = df1.join(df2, on=['key1', 'key2'], how='left')
assert x.rdd.getNumPartitions() == 3
{code}
*(scenario 2)* But If I joint them by [key1, key2, time] shuffle operation takes place (number of partitions in result dataframe is 10 which is driven by spark.sql.shuffle.partitions option):
{code:java}
x = df1.join(df2, on=['key1', 'key2', 'time'], how='left')
assert x.rdd.getNumPartitions() == 10
{code}
*(scenario 3)* Join them by [key1, key2, time] via another version of join method:
{code:java}
x = df1.join(df2, [
    df1['key1'] == df2['key1'],
    df1['key2'] == df2['key2'],
    df1['time'] == df2['time']
], how='left').drop(df2['key1']).drop(df2['key2']).drop(df2['time'])
assert x.rdd.getNumPartitions() == 10
{code}
*(scenario 4)* Join them by [key1, key2, time] via another version of join method with quality condition changed to equivalent. And surprisingly *it uses partitioning*:
{code:java}
x = df1.join(df2, [
    df1['key1'] == df2['key1'],
    df1['key2'] == df2['key2'],
    (df1['time'] <= df2['time']) & (df1['time'] >= df2['time'])
], how='left').drop(df2['key1']).drop(df2['key2']).drop(df2['time'])
assert x.rdd.getNumPartitions() == 3
{code}
*I expect all four described join scenarios to use partitioning and avoid shuffle.*

At the same time groupby and window operations by [key1, key2, time] preserve number of partitions and done without shuffle.

  was:
Hi Spark developers.

Few months ago I asked this question at [stackoverflow|https://stackoverflow.com/questions/55229290/question-about-joining-dataframes-in-spark] but could not get usable solution.
 Only one valid suggestion was to implement it via catalyst optimizer extensions but this is not something that an ordinary user can do.
 I decided to raise improvement request since think such functionality should be available out of the box.

Suppose I have two partitioned dataframes:
{code:java}
df1 = spark.createDataFrame(
    [(1,1,1), (2,2,2)], ['key1', 'key2', 'time']
).repartition(3, 'key1', 'key2')df2 = spark.createDataFrame(
    [(1,1,1)], ['key1', 'key2', 'time']
).repartition(3, 'key1', 'key2')
{code}
*(scenario 1)* If I join them by [key1, key2] join operation is performed within each partition without shuffle (number of partitions in result dataframe is the same):
{code:java}
x = df1.join(df2, on=['key1', 'key2'], how='left')
assert x.rdd.getNumPartitions() == 3
{code}
*(scenario 2)* But If I joint them by [key1, key2, time] shuffle operation takes place (number of partitions in result dataframe is 10 which is driven by spark.sql.shuffle.partitions option):
{code:java}
x = df1.join(df2, on=['key1', 'key2', 'time'], how='left')
assert x.rdd.getNumPartitions() == 10
{code}
*(scenario 3)* Join them by [key1, key2, time] via another version of join method:
{code:java}
x = df1.join(df2, [
    df1['key1'] == df2['key1'],
    df1['key2'] == df2['key2'],
    df1['time'] == df2['time']
], how='left').drop(df2['key1']).drop(df2['key2']).drop(df2['time'])
assert x.rdd.getNumPartitions() == 10
{code}
*(scenario 4)* Join them by [key1, key2, time] via another version of join method with quality condition changed to equivalent. And surprisingly *it uses partitioning*:
{code:java}
x = df1.join(df2, [
    df1['key1'] == df2['key1'],
    df1['key2'] == df2['key2'],
    (df1['time'] <= df2['time']) & (df1['time'] >= df2['time'])
], how='left').drop(df2['key1']).drop(df2['key2']).drop(df2['time'])
assert x.rdd.getNumPartitions() == 3
{code}
*I expect all four described join scenarios to use partitioning and avoid shuffle.*

At the same time groupby and window operations by [key1, key2, time] preserve number of partitions and done without shuffle.


> Join partitioned dataframes on superset of partitioning columns without shuffle
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-28771
>                 URL: https://issues.apache.org/jira/browse/SPARK-28771
>             Project: Spark
>          Issue Type: Improvement
>          Components: Optimizer, SQL
>    Affects Versions: 3.1.0
>         Environment:  
> {code:java}
> from pyspark import SparkConf
> from pyspark.sql import SparkSession
> conf = SparkConf()
> conf.setAll({
>   'spark.master': 'local[*]',
>   'spark.sql.execution.arrow.enabled': 'true',
>   'spark.sql.autoBroadcastJoinThreshold': '-1',
>   'spark.sql.shuffle.partitions': '10'
> }.items());
> spark = SparkSession.builder.config(conf=conf).getOrCreate()
> {code}
>  
>  
>            Reporter: Artem Bergkamp
>            Priority: Minor
>
> Hi Spark developers.
> Few months ago I asked this question at [stackoverflow|https://stackoverflow.com/questions/55229290/question-about-joining-dataframes-in-spark] but could not get usable solution.
>  Only one valid suggestion was to implement it via catalyst optimizer extensions but this is not something that an ordinary user can do.
>  I decided to raise improvement request since think such functionality should be available out of the box.
> Suppose I have two partitioned dataframes:
> {code:java}
> df1 = spark.createDataFrame(
>     [(1,1,1), (2,2,2)], ['key1', 'key2', 'time']
> ).repartition(3, 'key1', 'key2')
> df2 = spark.createDataFrame(
>     [(1,1,1)], ['key1', 'key2', 'time']
> ).repartition(3, 'key1', 'key2')
> {code}
> *(scenario 1)* If I join them by [key1, key2] join operation is performed within each partition without shuffle (number of partitions in result dataframe is the same):
> {code:java}
> x = df1.join(df2, on=['key1', 'key2'], how='left')
> assert x.rdd.getNumPartitions() == 3
> {code}
> *(scenario 2)* But If I joint them by [key1, key2, time] shuffle operation takes place (number of partitions in result dataframe is 10 which is driven by spark.sql.shuffle.partitions option):
> {code:java}
> x = df1.join(df2, on=['key1', 'key2', 'time'], how='left')
> assert x.rdd.getNumPartitions() == 10
> {code}
> *(scenario 3)* Join them by [key1, key2, time] via another version of join method:
> {code:java}
> x = df1.join(df2, [
>     df1['key1'] == df2['key1'],
>     df1['key2'] == df2['key2'],
>     df1['time'] == df2['time']
> ], how='left').drop(df2['key1']).drop(df2['key2']).drop(df2['time'])
> assert x.rdd.getNumPartitions() == 10
> {code}
> *(scenario 4)* Join them by [key1, key2, time] via another version of join method with quality condition changed to equivalent. And surprisingly *it uses partitioning*:
> {code:java}
> x = df1.join(df2, [
>     df1['key1'] == df2['key1'],
>     df1['key2'] == df2['key2'],
>     (df1['time'] <= df2['time']) & (df1['time'] >= df2['time'])
> ], how='left').drop(df2['key1']).drop(df2['key2']).drop(df2['time'])
> assert x.rdd.getNumPartitions() == 3
> {code}
> *I expect all four described join scenarios to use partitioning and avoid shuffle.*
> At the same time groupby and window operations by [key1, key2, time] preserve number of partitions and done without shuffle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org