You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Fernando Pereira (JIRA)" <ji...@apache.org> on 2018/02/02 19:10:00 UTC

[jira] [Reopened] (SPARK-17859) persist should not impede with spark's ability to perform a broadcast join.

     [ https://issues.apache.org/jira/browse/SPARK-17859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Fernando Pereira reopened SPARK-17859:
--------------------------------------

This bug persists
{code:java}
SPARK version 2.2.1

SparkSession available as 'spark'.

In [1]: df_large = spark.range(1e6)
In [2]: spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 0)
In [3]: df_small = spark.range(10)
In [5]: from pyspark.sql.functions import broadcast
In [6]: df_small = broadcast(spark.range(10).coalesce(1)).cache()

In [7]: df_large.join(df_small, "id").explain()
== Physical Plan ==
*Project [id#0L]
+- *SortMergeJoin [id#0L], [id#6L], Inner
   :- *Sort [id#0L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#0L, 200)
   :     +- *Range (0, 1000000, step=1, splits=4)
   +- *Sort [id#6L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#6L, 200)
         +- InMemoryTableScan [id#6L]
               +- InMemoryRelation [id#6L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                     +- Coalesce 1
                        +- *Range (0, 10, step=1, splits=4)

In [8]: df_large.join(df_small.unpersist(), "id").explain()
== Physical Plan ==
*Project [id#0L]
+- *BroadcastHashJoin [id#0L], [id#6L], Inner, BuildRight
   :- *Range (0, 1000000, step=1, splits=4)
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
      +- Coalesce 1
         +- *Range (0, 10, step=1, splits=4)

{code}

> persist should not impede with spark's ability to perform a broadcast join.
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-17859
>                 URL: https://issues.apache.org/jira/browse/SPARK-17859
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 2.0.0
>         Environment: spark 2.0.0 , Linux RedHat
>            Reporter: Franck Tago
>            Priority: Major
>             Fix For: 2.0.2
>
>
> I am using Spark 2.0.0 
> My investigation leads me to conclude that calling persist could prevent broadcast join  from happening .
> Example
> Case1: No persist call 
> var  df1 =spark.range(1000000).select($"id".as("id1"))
> df1: org.apache.spark.sql.DataFrame = [id1: bigint]
>  var df2 =spark.range(1000).select($"id".as("id2"))
> df2: org.apache.spark.sql.DataFrame = [id2: bigint]
>  df1.join(df2 , $"id1" === $"id2" ).explain 
> == Physical Plan ==
> *BroadcastHashJoin [id1#117L], [id2#123L], Inner, BuildRight
> :- *Project [id#114L AS id1#117L]
> :  +- *Range (0, 1000000, splits=2)
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
>    +- *Project [id#120L AS id2#123L]
>       +- *Range (0, 1000, splits=2)
> Case 2:  persist call 
>  df1.persist.join(df2 , $"id1" === $"id2" ).explain 
> 16/10/10 15:50:21 WARN CacheManager: Asked to cache already cached data.
> == Physical Plan ==
> *SortMergeJoin [id1#3L], [id2#9L], Inner
> :- *Sort [id1#3L ASC], false, 0
> :  +- Exchange hashpartitioning(id1#3L, 10)
> :     +- InMemoryTableScan [id1#3L]
> :        :  +- InMemoryRelation [id1#3L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
> :        :     :  +- *Project [id#0L AS id1#3L]
> :        :     :     +- *Range (0, 1000000, splits=2)
> +- *Sort [id2#9L ASC], false, 0
>    +- Exchange hashpartitioning(id2#9L, 10)
>       +- InMemoryTableScan [id2#9L]
>          :  +- InMemoryRelation [id2#9L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
>          :     :  +- *Project [id#6L AS id2#9L]
>          :     :     +- *Range (0, 1000, splits=2)
> Why does the persist call prevent the broadcast join . 
> My opinion is that it should not .
> I was made aware that the persist call is  lazy and that might have something to do with it , but I still contend that it should not . 
> Losing broadcast joins is really costly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org