You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by zhangliyun <ke...@126.com> on 2019/07/29 05:12:12 UTC

How to force sorted merge join to broadcast join

Hi all:
   i want to ask a question about   broadcast join in spark sql.




```
   select A.*,B.nsf_cards_ratio * 1.00 / A.nsf_on_entry as nsf_ratio_to_pop
from B
left join A
on trim(A.country) = trim(B.cntry_code);
```
here A is a small table only 8 rows, but somehow the statistics of table A has problem.


A join B is sort merged join while the join key ( trim(A.country) = trim(B.cntry_code)) only
has serveral values( neary 21 countries).  is there any way i force spark sql to use 
broadcast join (I can not use enlarge the spark.sql.autoBroadcastJoinThreshold  as i did not know the detail size of spark sql deal with it ).


I tried to print the physical plan , but it did not show the table size and i did not know 
how to enlarge the value of spark.sql.autoBroadcastJoinThreshold to force the sort merge join to
broadcast join.  




```
== Parsed Logical Plan ==
'Project [ArrayBuffer(cc_base_part1).*, (('cc_base_part1.nsf_cards_ratio * 1.00) / 'cc_rank_agg.nsf_on_entry) AS nsf_ratio_to_pop#369]
+- 'Join LeftOuter, ('trim('cc_base_part1.country) = 'trim('cc_rank_agg.cntry_code))
   :- 'UnresolvedRelation `cc_base_part1`
   +- 'UnresolvedRelation `cc_rank_agg`


== Analyzed Logical Plan ==
cust_id: string, country: string, cc_id: decimal(38,0), bin_hmac: string, credit_card_created_date: string, card_usage: smallint, cc_category: string, cc_size: string, nsf_risk: string, nsf_cards_ratio: decimal(18,2), dt: string, nsf_ratio_to_pop: decimal(38,6)
Project [cust_id#372, country#373, cc_id#374, bin_hmac#375, credit_card_created_date#376, card_usage#377, cc_category#378, cc_size#379, nsf_risk#380, nsf_cards_ratio#381, dt#382, CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(nsf_cards_ratio#381 as decimal(18,2))) * promote_precision(cast(1.00 as decimal(18,2)))), DecimalType(22,4)) as decimal(38,16))) / promote_precision(cast(nsf_on_entry#386 as decimal(38,16)))), DecimalType(38,6)) AS nsf_ratio_to_pop#369]
+- Join LeftOuter, (trim(country#373, None) = trim(cntry_code#383, None))
   :- SubqueryAlias cc_base_part1
   :  +- HiveTableRelation `fpv365h`.`cc_base_part1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cust_id#372, country#373, cc_id#374, bin_hmac#375, credit_card_created_date#376, card_usage#377, cc_category#378, cc_size#379, nsf_risk#380, nsf_cards_ratio#381], [dt#382]
   +- SubqueryAlias cc_rank_agg
      +- HiveTableRelation `fpv365h`.`cc_rank_agg`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cntry_code#383, num_tot_cards#384L, num_nsf_cards#385L, nsf_on_entry#386], [dt#387]






```


Does spark have any command to show the table size  when printing the physical plan ?   Appreciate if you can help my question.




Best regards


Kelly Zhang

Re: Re: How to force sorted merge join to broadcast join

Posted by Wenchen Fan <cl...@gmail.com>.
You can try EXPLAIN COST query and see if it works for you.

On Mon, Jul 29, 2019 at 5:34 PM Rubén Berenguel <rb...@gmail.com>
wrote:

> I think there is no way of doing that (at least don't remember one right
> now). The closer I remember now, is you can run the SQL "ANALYZE TABLE
> table_name COMPUTE STATISTIC" to compute them regardless of having a query
> (also hints the cost based optimiser if I remember correctly), but as far
> as displaying them it escapes me right now if it can be done.
>
> R
>
> --
> Rubén Berenguel
>
> On 29 July 2019 at 11:03:13, zhangliyun (kellyzly@126.com) wrote:
>
> thks! after using the syntax provided in the link, select /*+ BROADCAST
> (A) */ ...  , i got what i want.
> but i want to ask beside using queryExecution.stringWithStats (dataframe
> api) to show the table statistics, is there any way to show the table
> statistics in explain xxx in spark sql command line?
>
> Best Regards
> Kelly
>
>
>
> 在 2019-07-29 14:29:50,"Rubén Berenguel" <rb...@gmail.com> 写道:
>
> Hi, I hope this answers your question.
>
> You can hint the broadcast in SQL as detailed here:
> https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-joins-broadcast.html (thanks
> Jacek :) )
> I'd recommend creating a temporary table with the trimming you use in the
> join (for clarity). Also keep in mind using the methods is more
> powerful/readable than
> using Spark SQL directly (as happens with the broadcast case, although it
> depends on personal preference).
>
> Regards,
>
> Ruben
>
> --
> Rubén Berenguel
>
> On 29 July 2019 at 07:12:30, zhangliyun (kellyzly@126.com) wrote:
>
> Hi all:
>    i want to ask a question about   broadcast join in spark sql.
>
>
> ```
>    select A.*,B.nsf_cards_ratio * 1.00 / A.nsf_on_entry as nsf_ratio_to_pop
> from B
> left join A
> on trim(A.country) = trim(B.cntry_code);
> ```
> here A is a small table only 8 rows, but somehow the statistics of table A
> has problem.
>
> A join B is sort merged join while the join key ( trim(A.country) =
> trim(B.cntry_code)) only
> has serveral values( neary 21 countries).  is there any way i force spark
> sql to use
> broadcast join (I can not use enlarge the
> spark.sql.autoBroadcastJoinThreshold  as i did not know the detail size of
> spark sql deal with it ).
>
> I tried to print the physical plan , but it did not show the table size
> and i did not know
> how to enlarge the value of spark.sql.autoBroadcastJoinThreshold to force
> the sort merge join to
> broadcast join.
>
>
> ```
> == Parsed Logical Plan ==
> 'Project [ArrayBuffer(cc_base_part1).*, (('cc_base_part1.nsf_cards_ratio *
> 1.00) / 'cc_rank_agg.nsf_on_entry) AS nsf_ratio_to_pop#369]
> +- 'Join LeftOuter, ('trim('cc_base_part1.country) =
> 'trim('cc_rank_agg.cntry_code))
>    :- 'UnresolvedRelation `cc_base_part1`
>    +- 'UnresolvedRelation `cc_rank_agg`
>
> == Analyzed Logical Plan ==
> cust_id: string, country: string, cc_id: decimal(38,0), bin_hmac: string,
> credit_card_created_date: string, card_usage: smallint, cc_category:
> string, cc_size: string, nsf_risk: string, nsf_cards_ratio: decimal(18,2),
> dt: string, nsf_ratio_to_pop: decimal(38,6)
> Project [cust_id#372, country#373, cc_id#374, bin_hmac#375,
> credit_card_created_date#376, card_usage#377, cc_category#378, cc_size#379,
> nsf_risk#380, nsf_cards_ratio#381, dt#382,
> CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(nsf_cards_ratio#381
> as decimal(18,2))) * promote_precision(cast(1.00 as decimal(18,2)))),
> DecimalType(22,4)) as decimal(38,16))) /
> promote_precision(cast(nsf_on_entry#386 as decimal(38,16)))),
> DecimalType(38,6)) AS nsf_ratio_to_pop#369]
> +- Join LeftOuter, (trim(country#373, None) = trim(cntry_code#383, None))
>    :- SubqueryAlias cc_base_part1
>    :  +- HiveTableRelation `fpv365h`.`cc_base_part1`,
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cust_id#372,
> country#373, cc_id#374, bin_hmac#375, credit_card_created_date#376,
> card_usage#377, cc_category#378, cc_size#379, nsf_risk#380,
> nsf_cards_ratio#381], [dt#382]
>    +- SubqueryAlias cc_rank_agg
>       +- HiveTableRelation `fpv365h`.`cc_rank_agg`,
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cntry_code#383,
> num_tot_cards#384L, num_nsf_cards#385L, nsf_on_entry#386], [dt#387]
>
>
>
> ```
>
> Does spark have any command to show the table size  when printing the
> physical plan ?   Appreciate if you can help my question.
>
>
> Best regards
>
> Kelly Zhang
>
>
>
>
>
>
>
>
>

Re:Re: How to force sorted merge join to broadcast join

Posted by Rubén Berenguel <rb...@gmail.com>.
I think there is no way of doing that (at least don't remember one right
now). The closer I remember now, is you can run the SQL "ANALYZE TABLE
table_name COMPUTE STATISTIC" to compute them regardless of having a query
(also hints the cost based optimiser if I remember correctly), but as far
as displaying them it escapes me right now if it can be done.

R

-- 
Rubén Berenguel

On 29 July 2019 at 11:03:13, zhangliyun (kellyzly@126.com) wrote:

thks! after using the syntax provided in the link, select /*+ BROADCAST (A)
*/ ...  , i got what i want.
but i want to ask beside using queryExecution.stringWithStats (dataframe
api) to show the table statistics, is there any way to show the table
statistics in explain xxx in spark sql command line?

Best Regards
Kelly



在 2019-07-29 14:29:50,"Rubén Berenguel" <rb...@gmail.com> 写道:

Hi, I hope this answers your question.

You can hint the broadcast in SQL as detailed here:
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-joins-broadcast.html
(thanks
Jacek :) )
I'd recommend creating a temporary table with the trimming you use in the
join (for clarity). Also keep in mind using the methods is more
powerful/readable than
using Spark SQL directly (as happens with the broadcast case, although it
depends on personal preference).

Regards,

Ruben

-- 
Rubén Berenguel

On 29 July 2019 at 07:12:30, zhangliyun (kellyzly@126.com) wrote:

Hi all:
   i want to ask a question about   broadcast join in spark sql.


```
   select A.*,B.nsf_cards_ratio * 1.00 / A.nsf_on_entry as nsf_ratio_to_pop
from B
left join A
on trim(A.country) = trim(B.cntry_code);
```
here A is a small table only 8 rows, but somehow the statistics of table A
has problem.

A join B is sort merged join while the join key ( trim(A.country) =
trim(B.cntry_code)) only
has serveral values( neary 21 countries).  is there any way i force spark
sql to use
broadcast join (I can not use enlarge the
spark.sql.autoBroadcastJoinThreshold  as i did not know the detail size of
spark sql deal with it ).

I tried to print the physical plan , but it did not show the table size and
i did not know
how to enlarge the value of spark.sql.autoBroadcastJoinThreshold to force
the sort merge join to
broadcast join.


```
== Parsed Logical Plan ==
'Project [ArrayBuffer(cc_base_part1).*, (('cc_base_part1.nsf_cards_ratio *
1.00) / 'cc_rank_agg.nsf_on_entry) AS nsf_ratio_to_pop#369]
+- 'Join LeftOuter, ('trim('cc_base_part1.country) =
'trim('cc_rank_agg.cntry_code))
   :- 'UnresolvedRelation `cc_base_part1`
   +- 'UnresolvedRelation `cc_rank_agg`

== Analyzed Logical Plan ==
cust_id: string, country: string, cc_id: decimal(38,0), bin_hmac: string,
credit_card_created_date: string, card_usage: smallint, cc_category:
string, cc_size: string, nsf_risk: string, nsf_cards_ratio: decimal(18,2),
dt: string, nsf_ratio_to_pop: decimal(38,6)
Project [cust_id#372, country#373, cc_id#374, bin_hmac#375,
credit_card_created_date#376, card_usage#377, cc_category#378, cc_size#379,
nsf_risk#380, nsf_cards_ratio#381, dt#382,
CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(nsf_cards_ratio#381
as decimal(18,2))) * promote_precision(cast(1.00 as decimal(18,2)))),
DecimalType(22,4)) as decimal(38,16))) /
promote_precision(cast(nsf_on_entry#386 as decimal(38,16)))),
DecimalType(38,6)) AS nsf_ratio_to_pop#369]
+- Join LeftOuter, (trim(country#373, None) = trim(cntry_code#383, None))
   :- SubqueryAlias cc_base_part1
   :  +- HiveTableRelation `fpv365h`.`cc_base_part1`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cust_id#372,
country#373, cc_id#374, bin_hmac#375, credit_card_created_date#376,
card_usage#377, cc_category#378, cc_size#379, nsf_risk#380,
nsf_cards_ratio#381], [dt#382]
   +- SubqueryAlias cc_rank_agg
      +- HiveTableRelation `fpv365h`.`cc_rank_agg`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cntry_code#383,
num_tot_cards#384L, num_nsf_cards#385L, nsf_on_entry#386], [dt#387]



```

Does spark have any command to show the table size  when printing the
physical plan ?   Appreciate if you can help my question.


Best regards

Kelly Zhang

Re:Re: How to force sorted merge join to broadcast join

Posted by zhangliyun <ke...@126.com>.
thks! after using the syntax provided in the link, select /*+ BROADCAST (A) */ ...  , i got what i want.
but i want to ask beside using queryExecution.stringWithStats (dataframe api) to show the table statistics, is there any way to show the table statistics in explain xxx in spark sql command line?


Best Regards
Kelly




在 2019-07-29 14:29:50,"Rubén Berenguel" <rb...@gmail.com> 写道:

Hi, I hope this answers your question.


You can hint the broadcast in SQL as detailed here: https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-joins-broadcast.html (thanks Jacek :) )
I'd recommend creating a temporary table with the trimming you use in the join (for clarity). Also keep in mind using the methods is more powerful/readable than
using Spark SQL directly (as happens with the broadcast case, although it depends on personal preference).


Regards, 


Ruben


-- 
Rubén Berenguel



On 29 July 2019 at 07:12:30, zhangliyun (kellyzly@126.com) wrote:

Hi all:
   i want to ask a question about   broadcast join in spark sql.




```
   select A.*,B.nsf_cards_ratio * 1.00 / A.nsf_on_entry as nsf_ratio_to_pop
from B
left join A
on trim(A.country) = trim(B.cntry_code);
```
here A is a small table only 8 rows, but somehow the statistics of table A has problem.


A join B is sort merged join while the join key ( trim(A.country) = trim(B.cntry_code)) only
has serveral values( neary 21 countries).  is there any way i force spark sql to use 
broadcast join (I can not use enlarge the spark.sql.autoBroadcastJoinThreshold  as i did not know the detail size of spark sql deal with it ).


I tried to print the physical plan , but it did not show the table size and i did not know 
how to enlarge the value of spark.sql.autoBroadcastJoinThreshold to force the sort merge join to
broadcast join.  




```
== Parsed Logical Plan ==
'Project [ArrayBuffer(cc_base_part1).*, (('cc_base_part1.nsf_cards_ratio * 1.00) / 'cc_rank_agg.nsf_on_entry) AS nsf_ratio_to_pop#369]
+- 'Join LeftOuter, ('trim('cc_base_part1.country) = 'trim('cc_rank_agg.cntry_code))
   :- 'UnresolvedRelation `cc_base_part1`
   +- 'UnresolvedRelation `cc_rank_agg`


== Analyzed Logical Plan ==
cust_id: string, country: string, cc_id: decimal(38,0), bin_hmac: string, credit_card_created_date: string, card_usage: smallint, cc_category: string, cc_size: string, nsf_risk: string, nsf_cards_ratio: decimal(18,2), dt: string, nsf_ratio_to_pop: decimal(38,6)
Project [cust_id#372, country#373, cc_id#374, bin_hmac#375, credit_card_created_date#376, card_usage#377, cc_category#378, cc_size#379, nsf_risk#380, nsf_cards_ratio#381, dt#382, CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(nsf_cards_ratio#381 as decimal(18,2))) * promote_precision(cast(1.00 as decimal(18,2)))), DecimalType(22,4)) as decimal(38,16))) / promote_precision(cast(nsf_on_entry#386 as decimal(38,16)))), DecimalType(38,6)) AS nsf_ratio_to_pop#369]
+- Join LeftOuter, (trim(country#373, None) = trim(cntry_code#383, None))
   :- SubqueryAlias cc_base_part1
   :  +- HiveTableRelation `fpv365h`.`cc_base_part1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cust_id#372, country#373, cc_id#374, bin_hmac#375, credit_card_created_date#376, card_usage#377, cc_category#378, cc_size#379, nsf_risk#380, nsf_cards_ratio#381], [dt#382]
   +- SubqueryAlias cc_rank_agg
      +- HiveTableRelation `fpv365h`.`cc_rank_agg`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cntry_code#383, num_tot_cards#384L, num_nsf_cards#385L, nsf_on_entry#386], [dt#387]






```


Does spark have any command to show the table size  when printing the physical plan ?   Appreciate if you can help my question.




Best regards


Kelly Zhang




 

Re: How to force sorted merge join to broadcast join

Posted by Rubén Berenguel <rb...@gmail.com>.
Hi, I hope this answers your question.

You can hint the broadcast in SQL as detailed here:
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-joins-broadcast.html
(thanks
Jacek :) )
I'd recommend creating a temporary table with the trimming you use in the
join (for clarity). Also keep in mind using the methods is more
powerful/readable than
using Spark SQL directly (as happens with the broadcast case, although it
depends on personal preference).

Regards,

Ruben

-- 
Rubén Berenguel

On 29 July 2019 at 07:12:30, zhangliyun (kellyzly@126.com) wrote:

Hi all:
   i want to ask a question about   broadcast join in spark sql.


```
   select A.*,B.nsf_cards_ratio * 1.00 / A.nsf_on_entry as nsf_ratio_to_pop
from B
left join A
on trim(A.country) = trim(B.cntry_code);
```
here A is a small table only 8 rows, but somehow the statistics of table A
has problem.

A join B is sort merged join while the join key ( trim(A.country) =
trim(B.cntry_code)) only
has serveral values( neary 21 countries).  is there any way i force spark
sql to use
broadcast join (I can not use enlarge the
spark.sql.autoBroadcastJoinThreshold  as i did not know the detail size of
spark sql deal with it ).

I tried to print the physical plan , but it did not show the table size and
i did not know
how to enlarge the value of spark.sql.autoBroadcastJoinThreshold to force
the sort merge join to
broadcast join.


```
== Parsed Logical Plan ==
'Project [ArrayBuffer(cc_base_part1).*, (('cc_base_part1.nsf_cards_ratio *
1.00) / 'cc_rank_agg.nsf_on_entry) AS nsf_ratio_to_pop#369]
+- 'Join LeftOuter, ('trim('cc_base_part1.country) =
'trim('cc_rank_agg.cntry_code))
   :- 'UnresolvedRelation `cc_base_part1`
   +- 'UnresolvedRelation `cc_rank_agg`

== Analyzed Logical Plan ==
cust_id: string, country: string, cc_id: decimal(38,0), bin_hmac: string,
credit_card_created_date: string, card_usage: smallint, cc_category:
string, cc_size: string, nsf_risk: string, nsf_cards_ratio: decimal(18,2),
dt: string, nsf_ratio_to_pop: decimal(38,6)
Project [cust_id#372, country#373, cc_id#374, bin_hmac#375,
credit_card_created_date#376, card_usage#377, cc_category#378, cc_size#379,
nsf_risk#380, nsf_cards_ratio#381, dt#382,
CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(nsf_cards_ratio#381
as decimal(18,2))) * promote_precision(cast(1.00 as decimal(18,2)))),
DecimalType(22,4)) as decimal(38,16))) /
promote_precision(cast(nsf_on_entry#386 as decimal(38,16)))),
DecimalType(38,6)) AS nsf_ratio_to_pop#369]
+- Join LeftOuter, (trim(country#373, None) = trim(cntry_code#383, None))
   :- SubqueryAlias cc_base_part1
   :  +- HiveTableRelation `fpv365h`.`cc_base_part1`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cust_id#372,
country#373, cc_id#374, bin_hmac#375, credit_card_created_date#376,
card_usage#377, cc_category#378, cc_size#379, nsf_risk#380,
nsf_cards_ratio#381], [dt#382]
   +- SubqueryAlias cc_rank_agg
      +- HiveTableRelation `fpv365h`.`cc_rank_agg`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [cntry_code#383,
num_tot_cards#384L, num_nsf_cards#385L, nsf_on_entry#386], [dt#387]



```

Does spark have any command to show the table size  when printing the
physical plan ?   Appreciate if you can help my question.


Best regards

Kelly Zhang