You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Srikanth <sr...@gmail.com> on 2015/07/01 17:30:20 UTC

BroadcastHashJoin when RDD is not cached

Hello,



I have a straight forward use case of joining a large table with a smaller
table. The small table is within the limit I set for
spark.sql.autoBroadcastJoinThreshold.

I notice that ShuffledHashJoin is used to perform the join.
BroadcastHashJoin was used only when I pre-fetched and cached the small
table.

I understand that for typical broadcast we would have to read and collect()
the small table in driver before broadcasting.

Why not do this automatically for joins? That way stage1(read large table)
and stage2(read small table) can still be run in parallel.





Sort [emailId#19 ASC,date#0 ASC], true

 Exchange (RangePartitioning 24)

  Project [emailId#19,ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L]

   Filter ((lowerTime#22 <= date#0) && (date#0 <= upperTime#23))

    *ShuffledHashJoin* [ip#7], [ip#18], BuildRight

     Exchange (HashPartitioning 24)

      Project [ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L]

       PhysicalRDD
[date#0,siteName#1,method#2,uri#3,query#4,port#5,userName#6],
MapPartitionsRDD[6] at rddToDataFrameHolder at DataSourceReader.scala:25

     Exchange (HashPartitioning 24)

      Project [emailId#19,scalaUDF(date#20) AS
upperTime#23,ip#18,scalaUDF(date#20) AS lowerTime#22]

       PhysicalRDD [ip#18,emailId#19,date#20], MapPartitionsRDD[12] at
rddToDataFrameHolder at DataSourceReader.scala:41


Srikanth

Re: BroadcastHashJoin when RDD is not cached

Posted by Srikanth <sr...@gmail.com>.
Good to know this will be in next release. Thanks.

On Wed, Jul 1, 2015 at 3:13 PM, Michael Armbrust <mi...@databricks.com>
wrote:

> We don't know that the table is small unless you cache it.  In Spark 1.5
> you'll be able to give us a hint though (
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L581
> )
>
> On Wed, Jul 1, 2015 at 8:30 AM, Srikanth <sr...@gmail.com> wrote:
>
>> Hello,
>>
>>
>>
>> I have a straight forward use case of joining a large table with a
>> smaller table. The small table is within the limit I set for
>> spark.sql.autoBroadcastJoinThreshold.
>>
>> I notice that ShuffledHashJoin is used to perform the join.
>> BroadcastHashJoin was used only when I pre-fetched and cached the small
>> table.
>>
>> I understand that for typical broadcast we would have to read and
>> collect() the small table in driver before broadcasting.
>>
>> Why not do this automatically for joins? That way stage1(read large
>> table) and stage2(read small table) can still be run in parallel.
>>
>>
>>
>>
>>
>> Sort [emailId#19 ASC,date#0 ASC], true
>>
>>  Exchange (RangePartitioning 24)
>>
>>   Project [emailId#19,ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L]
>>
>>    Filter ((lowerTime#22 <= date#0) && (date#0 <= upperTime#23))
>>
>>     *ShuffledHashJoin* [ip#7], [ip#18], BuildRight
>>
>>      Exchange (HashPartitioning 24)
>>
>>       Project [ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L]
>>
>>        PhysicalRDD
>> [date#0,siteName#1,method#2,uri#3,query#4,port#5,userName#6],
>> MapPartitionsRDD[6] at rddToDataFrameHolder at DataSourceReader.scala:25
>>
>>      Exchange (HashPartitioning 24)
>>
>>       Project [emailId#19,scalaUDF(date#20) AS
>> upperTime#23,ip#18,scalaUDF(date#20) AS lowerTime#22]
>>
>>        PhysicalRDD [ip#18,emailId#19,date#20], MapPartitionsRDD[12] at
>> rddToDataFrameHolder at DataSourceReader.scala:41
>>
>>
>> Srikanth
>>
>
>

Re: BroadcastHashJoin when RDD is not cached

Posted by Michael Armbrust <mi...@databricks.com>.
We don't know that the table is small unless you cache it.  In Spark 1.5
you'll be able to give us a hint though (
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L581
)

On Wed, Jul 1, 2015 at 8:30 AM, Srikanth <sr...@gmail.com> wrote:

> Hello,
>
>
>
> I have a straight forward use case of joining a large table with a smaller
> table. The small table is within the limit I set for
> spark.sql.autoBroadcastJoinThreshold.
>
> I notice that ShuffledHashJoin is used to perform the join.
> BroadcastHashJoin was used only when I pre-fetched and cached the small
> table.
>
> I understand that for typical broadcast we would have to read and
> collect() the small table in driver before broadcasting.
>
> Why not do this automatically for joins? That way stage1(read large table)
> and stage2(read small table) can still be run in parallel.
>
>
>
>
>
> Sort [emailId#19 ASC,date#0 ASC], true
>
>  Exchange (RangePartitioning 24)
>
>   Project [emailId#19,ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L]
>
>    Filter ((lowerTime#22 <= date#0) && (date#0 <= upperTime#23))
>
>     *ShuffledHashJoin* [ip#7], [ip#18], BuildRight
>
>      Exchange (HashPartitioning 24)
>
>       Project [ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L]
>
>        PhysicalRDD
> [date#0,siteName#1,method#2,uri#3,query#4,port#5,userName#6],
> MapPartitionsRDD[6] at rddToDataFrameHolder at DataSourceReader.scala:25
>
>      Exchange (HashPartitioning 24)
>
>       Project [emailId#19,scalaUDF(date#20) AS
> upperTime#23,ip#18,scalaUDF(date#20) AS lowerTime#22]
>
>        PhysicalRDD [ip#18,emailId#19,date#20], MapPartitionsRDD[12] at
> rddToDataFrameHolder at DataSourceReader.scala:41
>
>
> Srikanth
>