You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Rishi Shah <ri...@gmail.com> on 2020/09/17 04:13:09 UTC

[pyspark 2.4] broadcasting DataFrame throws error

Hello All,

Hope this email finds you well. I have a dataframe of size 8TB (parquet
snappy compressed), however I group it by a column and get a much smaller
aggregated dataframe of size 700 rows (just two columns, key and count).
When I use it like below to broadcast this aggregated result, it throws
dataframe can not be broadcasted error.

df_agg = df.groupBy('column1').count().cache()
# df_agg.count()
df_join = df.join(broadcast(df_agg), 'column1', 'left_outer')
df_join.write.parquet('PATH')

The same code works with input df size of 3TB without any modifications.

Any suggestions?

-- 
Regards,

Rishi Shah

Re: [pyspark 2.4] broadcasting DataFrame throws error

Posted by Rishi Shah <ri...@gmail.com>.
Thanks Amit, I was referring to dynamic partition pruning (
https://issues.apache.org/jira/browse/SPARK-11150) & adaptive query
execution (https://issues.apache.org/jira/browse/SPARK-31412) in Sparkk 3 -
where it would figure out right partitions & pushes the filters to input
before applying the join.

On Sat, Sep 19, 2020 at 1:31 AM Amit Joshi <ma...@gmail.com>
wrote:

> Hi Rishi,
>
> May be you have aready done these steps.
> Can you check the size of the dataframe you are trying to broadcast using
> logInfo(SizeEstimator.estimate(df))
> and adjust the driver similarly.
>
> There is one more issue which I found was in spark 2.
> Broadcast does not work in cache data. It is possible this may not be the
> issue. You can check at your end the same problem.
>
>
> https://github.com/apache/spark/blame/master/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala#L219
>
> And can you pls tell what issue was solved in spark 3, which you are
> referring.
>
> Regards
> Amit
>
>
> On Saturday, September 19, 2020, Rishi Shah <ri...@gmail.com>
> wrote:
>
>> Thanks Amit. I have tried increasing driver memory , also tried
>> increasing max result size returned to the driver. Nothing works, I believe
>> spark is not able to determine the fact that the result to be broadcasted
>> is small enough because input data is huge? When I tried this in 2 stages,
>> write out the grouped data and use that to join using broadcast, spark has
>> no issues broadcasting this.
>>
>> When I was checking Spark 3 documentation, it seems like this issue may
>> have been addressed in Spark 3 but not in earlier version?
>>
>> On Thu, Sep 17, 2020 at 11:35 PM Amit Joshi <ma...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I think problem lies with driver memory. Broadcast in spark work by
>>> collecting all the data to driver and then driver broadcasting to all the
>>> executors. Different strategy could be employed for trasfer like bit
>>> torrent though.
>>>
>>> Please try increasing the driver memory. See if it works.
>>>
>>> Regards,
>>> Amit
>>>
>>>
>>> On Thursday, September 17, 2020, Rishi Shah <ri...@gmail.com>
>>> wrote:
>>>
>>>> Hello All,
>>>>
>>>> Hope this email finds you well. I have a dataframe of size 8TB (parquet
>>>> snappy compressed), however I group it by a column and get a much smaller
>>>> aggregated dataframe of size 700 rows (just two columns, key and count).
>>>> When I use it like below to broadcast this aggregated result, it throws
>>>> dataframe can not be broadcasted error.
>>>>
>>>> df_agg = df.groupBy('column1').count().cache()
>>>> # df_agg.count()
>>>> df_join = df.join(broadcast(df_agg), 'column1', 'left_outer')
>>>> df_join.write.parquet('PATH')
>>>>
>>>> The same code works with input df size of 3TB without any
>>>> modifications.
>>>>
>>>> Any suggestions?
>>>>
>>>> --
>>>> Regards,
>>>>
>>>> Rishi Shah
>>>>
>>>
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>

-- 
Regards,

Rishi Shah

Re: [pyspark 2.4] broadcasting DataFrame throws error

Posted by Amit Joshi <ma...@gmail.com>.
Hi Rishi,

May be you have aready done these steps.
Can you check the size of the dataframe you are trying to broadcast using
logInfo(SizeEstimator.estimate(df))
and adjust the driver similarly.

There is one more issue which I found was in spark 2.
Broadcast does not work in cache data. It is possible this may not be the
issue. You can check at your end the same problem.

https://github.com/apache/spark/blame/master/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala#L219

And can you pls tell what issue was solved in spark 3, which you are
referring.

Regards
Amit


On Saturday, September 19, 2020, Rishi Shah <ri...@gmail.com>
wrote:

> Thanks Amit. I have tried increasing driver memory , also tried increasing
> max result size returned to the driver. Nothing works, I believe spark is
> not able to determine the fact that the result to be broadcasted is small
> enough because input data is huge? When I tried this in 2 stages, write out
> the grouped data and use that to join using broadcast, spark has no issues
> broadcasting this.
>
> When I was checking Spark 3 documentation, it seems like this issue may
> have been addressed in Spark 3 but not in earlier version?
>
> On Thu, Sep 17, 2020 at 11:35 PM Amit Joshi <ma...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I think problem lies with driver memory. Broadcast in spark work by
>> collecting all the data to driver and then driver broadcasting to all the
>> executors. Different strategy could be employed for trasfer like bit
>> torrent though.
>>
>> Please try increasing the driver memory. See if it works.
>>
>> Regards,
>> Amit
>>
>>
>> On Thursday, September 17, 2020, Rishi Shah <ri...@gmail.com>
>> wrote:
>>
>>> Hello All,
>>>
>>> Hope this email finds you well. I have a dataframe of size 8TB (parquet
>>> snappy compressed), however I group it by a column and get a much smaller
>>> aggregated dataframe of size 700 rows (just two columns, key and count).
>>> When I use it like below to broadcast this aggregated result, it throws
>>> dataframe can not be broadcasted error.
>>>
>>> df_agg = df.groupBy('column1').count().cache()
>>> # df_agg.count()
>>> df_join = df.join(broadcast(df_agg), 'column1', 'left_outer')
>>> df_join.write.parquet('PATH')
>>>
>>> The same code works with input df size of 3TB without any modifications.
>>>
>>> Any suggestions?
>>>
>>> --
>>> Regards,
>>>
>>> Rishi Shah
>>>
>>
>
> --
> Regards,
>
> Rishi Shah
>

Re: [pyspark 2.4] broadcasting DataFrame throws error

Posted by Rishi Shah <ri...@gmail.com>.
Thanks Amit. I have tried increasing driver memory , also tried increasing
max result size returned to the driver. Nothing works, I believe spark is
not able to determine the fact that the result to be broadcasted is small
enough because input data is huge? When I tried this in 2 stages, write out
the grouped data and use that to join using broadcast, spark has no issues
broadcasting this.

When I was checking Spark 3 documentation, it seems like this issue may
have been addressed in Spark 3 but not in earlier version?

On Thu, Sep 17, 2020 at 11:35 PM Amit Joshi <ma...@gmail.com>
wrote:

> Hi,
>
> I think problem lies with driver memory. Broadcast in spark work by
> collecting all the data to driver and then driver broadcasting to all the
> executors. Different strategy could be employed for trasfer like bit
> torrent though.
>
> Please try increasing the driver memory. See if it works.
>
> Regards,
> Amit
>
>
> On Thursday, September 17, 2020, Rishi Shah <ri...@gmail.com>
> wrote:
>
>> Hello All,
>>
>> Hope this email finds you well. I have a dataframe of size 8TB (parquet
>> snappy compressed), however I group it by a column and get a much smaller
>> aggregated dataframe of size 700 rows (just two columns, key and count).
>> When I use it like below to broadcast this aggregated result, it throws
>> dataframe can not be broadcasted error.
>>
>> df_agg = df.groupBy('column1').count().cache()
>> # df_agg.count()
>> df_join = df.join(broadcast(df_agg), 'column1', 'left_outer')
>> df_join.write.parquet('PATH')
>>
>> The same code works with input df size of 3TB without any modifications.
>>
>> Any suggestions?
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>

-- 
Regards,

Rishi Shah

Re: [pyspark 2.4] broadcasting DataFrame throws error

Posted by Amit Joshi <ma...@gmail.com>.
Hi,

I think problem lies with driver memory. Broadcast in spark work by
collecting all the data to driver and then driver broadcasting to all the
executors. Different strategy could be employed for trasfer like bit
torrent though.

Please try increasing the driver memory. See if it works.

Regards,
Amit


On Thursday, September 17, 2020, Rishi Shah <ri...@gmail.com>
wrote:

> Hello All,
>
> Hope this email finds you well. I have a dataframe of size 8TB (parquet
> snappy compressed), however I group it by a column and get a much smaller
> aggregated dataframe of size 700 rows (just two columns, key and count).
> When I use it like below to broadcast this aggregated result, it throws
> dataframe can not be broadcasted error.
>
> df_agg = df.groupBy('column1').count().cache()
> # df_agg.count()
> df_join = df.join(broadcast(df_agg), 'column1', 'left_outer')
> df_join.write.parquet('PATH')
>
> The same code works with input df size of 3TB without any modifications.
>
> Any suggestions?
>
> --
> Regards,
>
> Rishi Shah
>