You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Srikanth <sr...@gmail.com> on 2016/01/28 21:26:57 UTC

Broadcast join on multiple dataframes

Hello,

I have a use case where one large table has to be joined with several
smaller tables.
I've added broadcast hint for all small tables in the joins.

    val largeTableDF = sqlContext.read.format("com.databricks.spark.csv")

    val metaActionDF = sqlContext.read.format("json")
    val cidOrgDF = sqlContext.read.format("com.databricks.spark.csv")
    val metaLocationDF =
sqlContext.read.format("json").option("samplingRatio","0.1").load(metaLocationFile)
                                   .join(broadcast(metaActionDF),
"campaign_id")
                                   .join(broadcast(cidOrgDF),
List("organization_id"), "left_outer")

    val metaCreativeDF = sqlContext.read.format("json")
    val metaExchangeDF = sqlContext.read.format("json")
    val localizationDF = sqlContext.read.format("com.databricks.spark.csv")
    val techKeyDF = sqlContext.read.format("com.databricks.spark.csv")

    val joinedBidderDF = largeTableDF.as("BID")
                            .join(broadcast(metaLocationDF), "strategy_id")
                            .join(broadcast(metaCreativeDF), "creative_id")
                            .join(broadcast(metaExchangeDF), $"exchange_id"
=== $"id" , "left_outer")
                            .join(broadcast(techKeyDF).as("TK"),
$"BID.tech_id" === $"TK.tech_key" , "left_outer")
                            .join(broadcast(localizationDF).as("BL"),
$"BID.language" === $"BL.id" , "left_outer")

When I look at the execution plan, all the joins are marked as
broadcastjoin.
But when I look at the spark job UI, the DAG visualization shows that some
joins are sortmerged with shuffle involved.
The ones that I've highlighted in yellow were shuffled.
DAG can be viewed here -
https://www.dropbox.com/s/mnxu5h067uyzdaj/DAG.PNG?dl=0

Why is the actual execution as seen in the DAG different from the physical
plan pasted below.
I'm trying not to shuffle my largeTable. Any idea what is causing this?

== Physical Plan ==

BroadcastHashOuterJoin [language#61], [id#167], LeftOuter, None

:- BroadcastHashOuterJoin [tech_id#59], [tech_key#170], LeftOuter, None

:  :- BroadcastHashOuterJoin [cast(exchange_id#13 as bigint)], [id#143L],
LeftOuter, None

:  :  :- Project [...]

:  :  :  +- BroadcastHashJoin [cast(creative_id#9 as bigint)],
[creative_id#131L], BuildRight

:  :  :     :- Project [...]

:  :  :     :  +- BroadcastHashJoin [cast(strategy_id#10 as bigint)],
[strategy_id#127L], BuildRight

:  :  :     :     :- ConvertToUnsafe

:  :  :     :     :  +- Scan
CsvRelation(<function0>,Some(file:///shared/data/bidder/*.lzo),false,

:  :  :     :     +- Project [...]

:  :  :     :        +- BroadcastHashOuterJoin [organization_id#90L],
[cast(organization_id#102 as bigint)], LeftOuter, None

:  :  :     :           :- Project [...]

:  :  :     :           :  +- BroadcastHashJoin [campaign_id#105L],
[campaign_id#75L], BuildRight

:  :  :     :           :     :- Project [...]

:  :  :     :           :     :  +- Scan
JSONRelation[id#112L,name#115,campaign_id#105L] InputPaths:
file:/shared/data/t1_meta/t1_meta_strategy.jsonl

:  :  :     :           :     +- Scan JSONRelation[] InputPaths:
file:/shared/data/t1_meta/t1_meta_campaign.jsonl

:  :  :     :           +- ConvertToUnsafe

:  :  :     :              +- Scan
CsvRelation(<function0>,Some(file:///shared/data/t1_meta/cid-orgs.txt),false,,,

:  :  :     +- Scan
JSONRelation[creative_id#131L,creative_name#132,concept_id#129L,concept_name#130]
InputPaths: file:/shared/data/t1_meta/t1_meta_creative.jsonl

:  :  +- Scan JSONRelation[description#142,id#143L,name#144] InputPaths:
file:/shared/data/t1_meta/t1_meta_exchange.jsonl

:  +- ConvertToUnsafe

:     +- Scan
CsvRelation(<function0>,Some(file:///shared/data/t1_meta/technology_key.txt),false,


+- ConvertToUnsafe

   +- Scan
CsvRelation(<function0>,Some(file:///shared/data/t1_meta/browser_languages.osv),false



Srikanth

Re: Broadcast join on multiple dataframes

Posted by Srikanth <sr...@gmail.com>.
Hello,

Any pointers on what is causing the optimizer to convert broadcast to
shuffle join?
This join is with a file that is just 4kb in size.

Complete plan -->
https://www.dropbox.com/s/apuomw1dg0t1jtc/plan_with_select.txt?dl=0
DAG from UI -->
https://www.dropbox.com/s/4xc9d0rdkx2fun8/DAG_with_select.PNG?dl=0


== Optimized Logical Plan ==
Project [...]
+- Join LeftOuter, Some((start_ip#48L = start_ip_num#144L))
   :- Project [...]
   :  +- Join Inner, Some((cast(creative_id#9 as bigint) =
creative_id#130L))
   :     :- Project [...]
   :     :  +- Join Inner, Some((cast(strategy_id#10 as bigint) =
strategy_id#126L))
   :     :     :- Project [...]
   :     :     :  +- Join LeftOuter, Some((cast(exchange_id#13 as bigint) =
id#142L))
   :     :     :     :- Project [...]
   :     :     :     :  +- Join LeftOuter, Some((browser_id#59 =
technology_key#169))
   :     :     :     :     :- Project [...]
   :     :     :     :     :  +- Join LeftOuter,
Some((primary_browser_language#61 = id#166))
   :     :     :     :     :     :- Project [...]
   :     :     :     :     :     :  +- Filter ((NOT (campaign_id#12 = 0) &&
(mm_int_cost#36 < 1000000.0)) && ((cost_sum#41 < 1000000.0) &&
(total_spend#42 < 1000000.0)))
   :     :     :     :     :     :     +- Relation[...)
   :     :     :     :     :     +- Project [id#166,two_letter_code#167]
   :     :     :     :     :        +- BroadcastHint
   :     :     :     :     :           +- Relation[...
   :     :     :     :     +- BroadcastHint
   :     :     :     :        +- Relation[...
   :     :     :     +- Project [description#141,id#142L]
   :     :     :        +- BroadcastHint
   :     :     :           +- Relation[description#141,id#142L,name#143]
JSONRelation

== Physical Plan ==
Project [...]
+- SortMergeOuterJoin [start_ip#48L], [start_ip_num#144L], LeftOuter, None
   :- Sort [start_ip#48L ASC], false, 0
   :  +- TungstenExchange hashpartitioning(start_ip#48L,480), None
   :     +- Project [...]
   :        +- BroadcastHashJoin [cast(creative_id#9 as bigint)],
[creative_id#130L], BuildRight
   :           :- Project [...]
   :           :  +- BroadcastHashJoin [cast(strategy_id#10 as bigint)],
[strategy_id#126L], BuildRight
   :           :     :- Project [...]
   :           :     :  +- BroadcastHashOuterJoin [cast(exchange_id#13 as
bigint)], [id#142L], LeftOuter, None
   :           :     :     :- Project [...]
   :           :     :     :  +- BroadcastHashOuterJoin [browser_id#59],
[technology_key#169], LeftOuter, None
   :           :     :     :     :- Project [...]
   :           :     :     :     :  +- SortMergeOuterJoin
[primary_browser_language#61], [id#166], LeftOuter, None
   :           :     :     :     :     :- Sort [primary_browser_language#61
ASC], false, 0
   :           :     :     :     :     :  +- TungstenExchange
hashpartitioning(primary_browser_language#61,480), None
   :           :     :     :     :     :     +- Project [...]
   :           :     :     :     :     :        +- Filter (((NOT
(campaign_id#12 = 0) && (mm_int_cost#36 < 1000000.0)) && (cost_sum#41 <
1000000.0)) && (total_spend#42 < 1000000.0))
   :           :     :     :     :     :           +- Scan
CsvRelation(<function0>,Some(s3://
   :           :     :     :     :     +- Sort [id#166 ASC], false, 0
   :           :     :     :     :        +- TungstenExchange
hashpartitioning(id#166,480), None
   :           :     :     :     :           +- Project
[id#166,two_letter_code#167]
   :           :     :     :     :              +- Scan
CsvRelation(<function0>,Some(s3
   :           :     :     :     +- ConvertToUnsafe
   :           :     :     :        +- Scan
CsvRelation(<function0>,Some(s3://
   :           :     :     +- Project [description#141,id#142L]
   :           :     :        +- Scan
JSONRelation[description#141,id#142L,name#143] InputPaths: s3://
   :           :     +- Project

Re: Broadcast join on multiple dataframes

Posted by Srikanth <sr...@gmail.com>.
Micheal,

Output of DF.queryExecution is saved to
https://www.dropbox.com/s/1vizuwpswza1e3x/plan.txt?dl=0
I don't see anything in this to suggest a switch in strategy. Hopefully you
find this helpful.

Srikanth

On Thu, Jan 28, 2016 at 4:43 PM, Michael Armbrust <mi...@databricks.com>
wrote:

> Can you provide the analyzed and optimized plans (explain(true))
>
> On Thu, Jan 28, 2016 at 12:26 PM, Srikanth <sr...@gmail.com> wrote:
>
>> Hello,
>>
>> I have a use case where one large table has to be joined with several
>> smaller tables.
>> I've added broadcast hint for all small tables in the joins.
>>
>>     val largeTableDF = sqlContext.read.format("com.databricks.spark.csv")
>>
>>     val metaActionDF = sqlContext.read.format("json")
>>     val cidOrgDF = sqlContext.read.format("com.databricks.spark.csv")
>>     val metaLocationDF =
>> sqlContext.read.format("json").option("samplingRatio","0.1").load(metaLocationFile)
>>                                    .join(broadcast(metaActionDF),
>> "campaign_id")
>>                                    .join(broadcast(cidOrgDF),
>> List("organization_id"), "left_outer")
>>
>>     val metaCreativeDF = sqlContext.read.format("json")
>>     val metaExchangeDF = sqlContext.read.format("json")
>>     val localizationDF =
>> sqlContext.read.format("com.databricks.spark.csv")
>>     val techKeyDF = sqlContext.read.format("com.databricks.spark.csv")
>>
>>     val joinedBidderDF = largeTableDF.as("BID")
>>                             .join(broadcast(metaLocationDF),
>> "strategy_id")
>>                             .join(broadcast(metaCreativeDF),
>> "creative_id")
>>                             .join(broadcast(metaExchangeDF),
>> $"exchange_id" === $"id" , "left_outer")
>>                             .join(broadcast(techKeyDF).as("TK"),
>> $"BID.tech_id" === $"TK.tech_key" , "left_outer")
>>                             .join(broadcast(localizationDF).as("BL"),
>> $"BID.language" === $"BL.id" , "left_outer")
>>
>> When I look at the execution plan, all the joins are marked as
>> broadcastjoin.
>> But when I look at the spark job UI, the DAG visualization shows that
>> some joins are sortmerged with shuffle involved.
>> The ones that I've highlighted in yellow were shuffled.
>> DAG can be viewed here -
>> https://www.dropbox.com/s/mnxu5h067uyzdaj/DAG.PNG?dl=0
>>
>> Why is the actual execution as seen in the DAG different from the
>> physical plan pasted below.
>> I'm trying not to shuffle my largeTable. Any idea what is causing this?
>>
>> == Physical Plan ==
>>
>> BroadcastHashOuterJoin [language#61], [id#167], LeftOuter, None
>>
>> :- BroadcastHashOuterJoin [tech_id#59], [tech_key#170], LeftOuter, None
>>
>> :  :- BroadcastHashOuterJoin [cast(exchange_id#13 as bigint)], [id#143L],
>> LeftOuter, None
>>
>> :  :  :- Project [...]
>>
>> :  :  :  +- BroadcastHashJoin [cast(creative_id#9 as bigint)],
>> [creative_id#131L], BuildRight
>>
>> :  :  :     :- Project [...]
>>
>> :  :  :     :  +- BroadcastHashJoin [cast(strategy_id#10 as bigint)],
>> [strategy_id#127L], BuildRight
>>
>> :  :  :     :     :- ConvertToUnsafe
>>
>> :  :  :     :     :  +- Scan
>> CsvRelation(<function0>,Some(file:///shared/data/bidder/*.lzo),false,
>>
>> :  :  :     :     +- Project [...]
>>
>> :  :  :     :        +- BroadcastHashOuterJoin [organization_id#90L],
>> [cast(organization_id#102 as bigint)], LeftOuter, None
>>
>> :  :  :     :           :- Project [...]
>>
>> :  :  :     :           :  +- BroadcastHashJoin [campaign_id#105L],
>> [campaign_id#75L], BuildRight
>>
>> :  :  :     :           :     :- Project [...]
>>
>> :  :  :     :           :     :  +- Scan
>> JSONRelation[id#112L,name#115,campaign_id#105L] InputPaths:
>> file:/shared/data/t1_meta/t1_meta_strategy.jsonl
>>
>> :  :  :     :           :     +- Scan JSONRelation[] InputPaths:
>> file:/shared/data/t1_meta/t1_meta_campaign.jsonl
>>
>> :  :  :     :           +- ConvertToUnsafe
>>
>> :  :  :     :              +- Scan
>> CsvRelation(<function0>,Some(file:///shared/data/t1_meta/cid-orgs.txt),false,,,
>>
>> :  :  :     +- Scan
>> JSONRelation[creative_id#131L,creative_name#132,concept_id#129L,concept_name#130]
>> InputPaths: file:/shared/data/t1_meta/t1_meta_creative.jsonl
>>
>> :  :  +- Scan JSONRelation[description#142,id#143L,name#144] InputPaths:
>> file:/shared/data/t1_meta/t1_meta_exchange.jsonl
>>
>> :  +- ConvertToUnsafe
>>
>> :     +- Scan
>> CsvRelation(<function0>,Some(file:///shared/data/t1_meta/technology_key.txt),false,
>>
>>
>> +- ConvertToUnsafe
>>
>>    +- Scan
>> CsvRelation(<function0>,Some(file:///shared/data/t1_meta/browser_languages.osv),false
>>
>>
>>
>> Srikanth
>>
>
>

Re: Broadcast join on multiple dataframes

Posted by Michael Armbrust <mi...@databricks.com>.
Can you provide the analyzed and optimized plans (explain(true))

On Thu, Jan 28, 2016 at 12:26 PM, Srikanth <sr...@gmail.com> wrote:

> Hello,
>
> I have a use case where one large table has to be joined with several
> smaller tables.
> I've added broadcast hint for all small tables in the joins.
>
>     val largeTableDF = sqlContext.read.format("com.databricks.spark.csv")
>
>     val metaActionDF = sqlContext.read.format("json")
>     val cidOrgDF = sqlContext.read.format("com.databricks.spark.csv")
>     val metaLocationDF =
> sqlContext.read.format("json").option("samplingRatio","0.1").load(metaLocationFile)
>                                    .join(broadcast(metaActionDF),
> "campaign_id")
>                                    .join(broadcast(cidOrgDF),
> List("organization_id"), "left_outer")
>
>     val metaCreativeDF = sqlContext.read.format("json")
>     val metaExchangeDF = sqlContext.read.format("json")
>     val localizationDF = sqlContext.read.format("com.databricks.spark.csv")
>     val techKeyDF = sqlContext.read.format("com.databricks.spark.csv")
>
>     val joinedBidderDF = largeTableDF.as("BID")
>                             .join(broadcast(metaLocationDF),
> "strategy_id")
>                             .join(broadcast(metaCreativeDF), "creative_id")
>                             .join(broadcast(metaExchangeDF),
> $"exchange_id" === $"id" , "left_outer")
>                             .join(broadcast(techKeyDF).as("TK"),
> $"BID.tech_id" === $"TK.tech_key" , "left_outer")
>                             .join(broadcast(localizationDF).as("BL"),
> $"BID.language" === $"BL.id" , "left_outer")
>
> When I look at the execution plan, all the joins are marked as
> broadcastjoin.
> But when I look at the spark job UI, the DAG visualization shows that some
> joins are sortmerged with shuffle involved.
> The ones that I've highlighted in yellow were shuffled.
> DAG can be viewed here -
> https://www.dropbox.com/s/mnxu5h067uyzdaj/DAG.PNG?dl=0
>
> Why is the actual execution as seen in the DAG different from the physical
> plan pasted below.
> I'm trying not to shuffle my largeTable. Any idea what is causing this?
>
> == Physical Plan ==
>
> BroadcastHashOuterJoin [language#61], [id#167], LeftOuter, None
>
> :- BroadcastHashOuterJoin [tech_id#59], [tech_key#170], LeftOuter, None
>
> :  :- BroadcastHashOuterJoin [cast(exchange_id#13 as bigint)], [id#143L],
> LeftOuter, None
>
> :  :  :- Project [...]
>
> :  :  :  +- BroadcastHashJoin [cast(creative_id#9 as bigint)],
> [creative_id#131L], BuildRight
>
> :  :  :     :- Project [...]
>
> :  :  :     :  +- BroadcastHashJoin [cast(strategy_id#10 as bigint)],
> [strategy_id#127L], BuildRight
>
> :  :  :     :     :- ConvertToUnsafe
>
> :  :  :     :     :  +- Scan
> CsvRelation(<function0>,Some(file:///shared/data/bidder/*.lzo),false,
>
> :  :  :     :     +- Project [...]
>
> :  :  :     :        +- BroadcastHashOuterJoin [organization_id#90L],
> [cast(organization_id#102 as bigint)], LeftOuter, None
>
> :  :  :     :           :- Project [...]
>
> :  :  :     :           :  +- BroadcastHashJoin [campaign_id#105L],
> [campaign_id#75L], BuildRight
>
> :  :  :     :           :     :- Project [...]
>
> :  :  :     :           :     :  +- Scan
> JSONRelation[id#112L,name#115,campaign_id#105L] InputPaths:
> file:/shared/data/t1_meta/t1_meta_strategy.jsonl
>
> :  :  :     :           :     +- Scan JSONRelation[] InputPaths:
> file:/shared/data/t1_meta/t1_meta_campaign.jsonl
>
> :  :  :     :           +- ConvertToUnsafe
>
> :  :  :     :              +- Scan
> CsvRelation(<function0>,Some(file:///shared/data/t1_meta/cid-orgs.txt),false,,,
>
> :  :  :     +- Scan
> JSONRelation[creative_id#131L,creative_name#132,concept_id#129L,concept_name#130]
> InputPaths: file:/shared/data/t1_meta/t1_meta_creative.jsonl
>
> :  :  +- Scan JSONRelation[description#142,id#143L,name#144] InputPaths:
> file:/shared/data/t1_meta/t1_meta_exchange.jsonl
>
> :  +- ConvertToUnsafe
>
> :     +- Scan
> CsvRelation(<function0>,Some(file:///shared/data/t1_meta/technology_key.txt),false,
>
>
> +- ConvertToUnsafe
>
>    +- Scan
> CsvRelation(<function0>,Some(file:///shared/data/t1_meta/browser_languages.osv),false
>
>
>
> Srikanth
>