You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by VIJAYAKUMAR JAWAHARLAL <sp...@data2o.io> on 2015/08/14 15:39:32 UTC

Left outer joining big data set with small lookups

Hi

I am facing huge performance problem when I am trying to left outer join very big data set (~140GB) with bunch of small lookups [Start schema type]. I am using data frame  in spark sql. It looks like data is shuffled and skewed when that join happens. Is there any way to improve performance of such type of join in spark? 

How can I hint optimizer to go with replicated join etc., to avoid shuffle? Would it help to create broadcast variables on small lookups?  If I create broadcast variables, how can I convert them into data frame and use them in sparksql type of join?

Thanks
Vijay
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Left outer joining big data set with small lookups

Posted by VIJAYAKUMAR JAWAHARLAL <sp...@data2o.io>.
Nope. Count action did not help to choose broadcast join.

All of my tables are hive external tables. So, I tried to trigger compute statistics from sqlContext.sql.  It gives me an error saying “nonsuch table”. I am not sure that is due to following bug in 1.4.1

https://issues.apache.org/jira/browse/SPARK-8105 <https://issues.apache.org/jira/browse/SPARK-8105>

I don’t find a way to enable broadcastHashjoin in my case :(


> On Aug 17, 2015, at 12:52 PM, Silvio Fiorito <si...@granturing.com> wrote:
> 
> Try doing a count on both lookups to force the caching to occur before the join.
> 
> 
> 
> 
> On 8/17/15, 12:39 PM, "VIJAYAKUMAR JAWAHARLAL" <sp...@data2o.io> wrote:
> 
>> Thanks for your help
>> 
>> I tried to cache the lookup tables and left out join with the big table (DF). Join does not seem to be using broadcast join-still it goes with hash partition join and shuffling big table. Here is the scenario
>> 
>> 
>> …
>> table1 as big_df
>> left outer join
>> table2 as lkup
>> on big_df.lkupid = lkup.lkupid
>> 
>> table1 above is well distributed across all 40 partitions because sqlContext.sql("SET spark.sql.shuffle.partitions=40"). table2 is small, using just 2 partition.  s. After the join stage, sparkUI showed me that all activities ended up in  just 2 executors. When I tried to dump the data in hdfs after join stage, all data ended up in 2 partition files and rest 38 files are 0 sized files.
>> 
>> Since above one did not work, I tried to broadcast DF and registered as table before join. 
>> 
>> val table2_df = sqlContext.sql("select * from table2")
>> val broadcast_table2 =sc.broadcast(table2_df)
>> broadcast_table2.value.registerTempTable(“table2”)
>> 
>> Broadcast is also having same issue as explained above. All data processed by just executors due to lookup skew.
>> 
>> Any more idea to tackle this issue in Spark Dataframe?
>> 
>> Thanks
>> Vijay
>> 
>> 
>>> On Aug 14, 2015, at 10:27 AM, Silvio Fiorito <si...@granturing.com> wrote:
>>> 
>>> You could cache the lookup DataFrames, it’ll then do a broadcast join.
>>> 
>>> 
>>> 
>>> 
>>> On 8/14/15, 9:39 AM, "VIJAYAKUMAR JAWAHARLAL" <sp...@data2o.io> wrote:
>>> 
>>>> Hi
>>>> 
>>>> I am facing huge performance problem when I am trying to left outer join very big data set (~140GB) with bunch of small lookups [Start schema type]. I am using data frame  in spark sql. It looks like data is shuffled and skewed when that join happens. Is there any way to improve performance of such type of join in spark? 
>>>> 
>>>> How can I hint optimizer to go with replicated join etc., to avoid shuffle? Would it help to create broadcast variables on small lookups?  If I create broadcast variables, how can I convert them into data frame and use them in sparksql type of join?
>>>> 
>>>> Thanks
>>>> Vijay
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>> 
>> 
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
> 


Re: Left outer joining big data set with small lookups

Posted by Silvio Fiorito <si...@granturing.com>.
Try doing a count on both lookups to force the caching to occur before the join.




On 8/17/15, 12:39 PM, "VIJAYAKUMAR JAWAHARLAL" <sp...@data2o.io> wrote:

>Thanks for your help
>
>I tried to cache the lookup tables and left out join with the big table (DF). Join does not seem to be using broadcast join-still it goes with hash partition join and shuffling big table. Here is the scenario
>
>
>…
>table1 as big_df
>left outer join
>table2 as lkup
>on big_df.lkupid = lkup.lkupid
>
>table1 above is well distributed across all 40 partitions because sqlContext.sql("SET spark.sql.shuffle.partitions=40"). table2 is small, using just 2 partition.  s. After the join stage, sparkUI showed me that all activities ended up in  just 2 executors. When I tried to dump the data in hdfs after join stage, all data ended up in 2 partition files and rest 38 files are 0 sized files.
>
>Since above one did not work, I tried to broadcast DF and registered as table before join. 
>
>val table2_df = sqlContext.sql("select * from table2")
>val broadcast_table2 =sc.broadcast(table2_df)
>broadcast_table2.value.registerTempTable(“table2”)
>
>Broadcast is also having same issue as explained above. All data processed by just executors due to lookup skew.
>
>Any more idea to tackle this issue in Spark Dataframe?
>
>Thanks
>Vijay
>
>
>> On Aug 14, 2015, at 10:27 AM, Silvio Fiorito <si...@granturing.com> wrote:
>> 
>> You could cache the lookup DataFrames, it’ll then do a broadcast join.
>> 
>> 
>> 
>> 
>> On 8/14/15, 9:39 AM, "VIJAYAKUMAR JAWAHARLAL" <sp...@data2o.io> wrote:
>> 
>>> Hi
>>> 
>>> I am facing huge performance problem when I am trying to left outer join very big data set (~140GB) with bunch of small lookups [Start schema type]. I am using data frame  in spark sql. It looks like data is shuffled and skewed when that join happens. Is there any way to improve performance of such type of join in spark? 
>>> 
>>> How can I hint optimizer to go with replicated join etc., to avoid shuffle? Would it help to create broadcast variables on small lookups?  If I create broadcast variables, how can I convert them into data frame and use them in sparksql type of join?
>>> 
>>> Thanks
>>> Vijay
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>> 
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Left outer joining big data set with small lookups

Posted by VIJAYAKUMAR JAWAHARLAL <sp...@data2o.io>.
Thanks for your help

I tried to cache the lookup tables and left out join with the big table (DF). Join does not seem to be using broadcast join-still it goes with hash partition join and shuffling big table. Here is the scenario


…
table1 as big_df
left outer join
table2 as lkup
on big_df.lkupid = lkup.lkupid

table1 above is well distributed across all 40 partitions because sqlContext.sql("SET spark.sql.shuffle.partitions=40"). table2 is small, using just 2 partition.  s. After the join stage, sparkUI showed me that all activities ended up in  just 2 executors. When I tried to dump the data in hdfs after join stage, all data ended up in 2 partition files and rest 38 files are 0 sized files.

Since above one did not work, I tried to broadcast DF and registered as table before join. 

val table2_df = sqlContext.sql("select * from table2")
val broadcast_table2 =sc.broadcast(table2_df)
broadcast_table2.value.registerTempTable(“table2”)

Broadcast is also having same issue as explained above. All data processed by just executors due to lookup skew.

Any more idea to tackle this issue in Spark Dataframe?

Thanks
Vijay


> On Aug 14, 2015, at 10:27 AM, Silvio Fiorito <si...@granturing.com> wrote:
> 
> You could cache the lookup DataFrames, it’ll then do a broadcast join.
> 
> 
> 
> 
> On 8/14/15, 9:39 AM, "VIJAYAKUMAR JAWAHARLAL" <sp...@data2o.io> wrote:
> 
>> Hi
>> 
>> I am facing huge performance problem when I am trying to left outer join very big data set (~140GB) with bunch of small lookups [Start schema type]. I am using data frame  in spark sql. It looks like data is shuffled and skewed when that join happens. Is there any way to improve performance of such type of join in spark? 
>> 
>> How can I hint optimizer to go with replicated join etc., to avoid shuffle? Would it help to create broadcast variables on small lookups?  If I create broadcast variables, how can I convert them into data frame and use them in sparksql type of join?
>> 
>> Thanks
>> Vijay
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>> 


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Left outer joining big data set with small lookups

Posted by Silvio Fiorito <si...@granturing.com>.
You could cache the lookup DataFrames, it’ll then do a broadcast join.




On 8/14/15, 9:39 AM, "VIJAYAKUMAR JAWAHARLAL" <sp...@data2o.io> wrote:

>Hi
>
>I am facing huge performance problem when I am trying to left outer join very big data set (~140GB) with bunch of small lookups [Start schema type]. I am using data frame  in spark sql. It looks like data is shuffled and skewed when that join happens. Is there any way to improve performance of such type of join in spark? 
>
>How can I hint optimizer to go with replicated join etc., to avoid shuffle? Would it help to create broadcast variables on small lookups?  If I create broadcast variables, how can I convert them into data frame and use them in sparksql type of join?
>
>Thanks
>Vijay
>---------------------------------------------------------------------
>To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Left outer joining big data set with small lookups

Posted by Raghavendra Pandey <ra...@gmail.com>.
In spark 1.4 there is a parameter to control that. Its default value is 10
M. So you need to cache your dataframe to hint the size.
On Aug 14, 2015 7:09 PM, "VIJAYAKUMAR JAWAHARLAL" <sp...@data2o.io>
wrote:

> Hi
>
> I am facing huge performance problem when I am trying to left outer join
> very big data set (~140GB) with bunch of small lookups [Start schema type].
> I am using data frame  in spark sql. It looks like data is shuffled and
> skewed when that join happens. Is there any way to improve performance of
> such type of join in spark?
>
> How can I hint optimizer to go with replicated join etc., to avoid
> shuffle? Would it help to create broadcast variables on small lookups?  If
> I create broadcast variables, how can I convert them into data frame and
> use them in sparksql type of join?
>
> Thanks
> Vijay
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>