You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by gen tang <ge...@gmail.com> on 2015/08/11 16:11:35 UTC

Potential bug broadcastNestedLoopJoin or default value of spark.sql.autoBroadcastJoinThreshold

Hi,

Recently, I use spark sql to do join on non-equality condition, condition1
or condition2 for example.

Spark will use broadcastNestedLoopJoin to do this. Assume that one of
dataframe(df1) is not created from hive table nor local collection and the
other one is created from hivetable(df2). For df1, spark will use
defaultSizeInBytes * length of df1 to estimate the size of df1 and use
correct size for df2.

As the result, in most cases, spark will think df1 is bigger than df2 even
df2 is really huge. And spark will do df2.collect(), which will cause error
or slowness of program.

Maybe we should just use defaultSizeInBytes for logicalRDD, not
defaultSizeInBytes * length?

Hope this could be helpful
Thanks a lot in advance for your help and input.

Cheers
Gen

Re: Potential bug broadcastNestedLoopJoin or default value of spark.sql.autoBroadcastJoinThreshold

Posted by gen tang <ge...@gmail.com>.
Hi,

Thanks a lot.

The problem is not do non-equal join for large tables, in fact, one table
is really small and another one is huge.

The problem is that spark can only get the correct size for dataframe
created directly from hive table. Even we create a dataframe from local
collection, it uses defaultSizeInBytes as its size. (Here, I am really
confused: why we don't use LogicalLocalTable in exsitingRDD.scala to
estimate its size. As I understand, this case class is created for this
purpose)

Then if we do some join or unionAll operation on this dataframe, the
estimated size will explode.

For instance, if we do join, val df = df1.join(df2, condition) then
df.queryExecution.analyzed.statistics.sizeInBytes = df1 * df2

In my case, I create df1 instance from an existing rdd.

I find a workaround for this problem:
1. save df1 in hive table
2. read this hive table and create a new dataframe
3. do outer join with this new dataframe

Cheers
Gen

On Wed, Aug 12, 2015 at 10:12 AM, Cheng, Hao <ha...@intel.com> wrote:

> Firstly, spark.sql.autoBroadcastJoinThreshold only works for the EQUAL
> JOIN.
>
>
>
> Currently, for the non-equal join, if the join type is the INNER join,
> then it will be done by CartesianProduct join and BroadcastNestedLoopJoin
> works for the outer joins.
>
>
>
> In the BroadcastnestedLoopJoin, the table with smaller estimate size will
> be broadcasted, but if the smaller table is also a huge table, I don’t
> think Spark SQL can handle that right now (OOM).
>
>
>
> So, I am not sure how you created the df1 instance, but we’d better to
> reflect the real size for the statistics of it, and let the framework
> decide what to do, hopefully Spark Sql can support the non-equal join for
> large tables in the next release.
>
>
>
> Hao
>
>
>
> *From:* gen tang [mailto:gen.tang86@gmail.com]
> *Sent:* Tuesday, August 11, 2015 10:12 PM
> *To:* dev@spark.apache.org
> *Subject:* Potential bug broadcastNestedLoopJoin or default value of
> spark.sql.autoBroadcastJoinThreshold
>
>
>
> Hi,
>
>
>
> Recently, I use spark sql to do join on non-equality condition, condition1
> or condition2 for example.
>
>
>
> Spark will use broadcastNestedLoopJoin to do this. Assume that one of
> dataframe(df1) is not created from hive table nor local collection and the
> other one is created from hivetable(df2). For df1, spark will use
> defaultSizeInBytes * length of df1 to estimate the size of df1 and use
> correct size for df2.
>
>
>
> As the result, in most cases, spark will think df1 is bigger than df2 even
> df2 is really huge. And spark will do df2.collect(), which will cause error
> or slowness of program.
>
>
>
> Maybe we should just use defaultSizeInBytes for logicalRDD, not
> defaultSizeInBytes * length?
>
>
>
> Hope this could be helpful
>
> Thanks a lot in advance for your help and input.
>
>
>
> Cheers
>
> Gen
>
>
>

RE: Potential bug broadcastNestedLoopJoin or default value of spark.sql.autoBroadcastJoinThreshold

Posted by "Cheng, Hao" <ha...@intel.com>.
Firstly, spark.sql.autoBroadcastJoinThreshold only works for the EQUAL JOIN.

Currently, for the non-equal join, if the join type is the INNER join, then it will be done by CartesianProduct join and BroadcastNestedLoopJoin works for the outer joins.

In the BroadcastnestedLoopJoin, the table with smaller estimate size will be broadcasted, but if the smaller table is also a huge table, I don’t think Spark SQL can handle that right now (OOM).

So, I am not sure how you created the df1 instance, but we’d better to reflect the real size for the statistics of it, and let the framework decide what to do, hopefully Spark Sql can support the non-equal join for large tables in the next release.

Hao

From: gen tang [mailto:gen.tang86@gmail.com]
Sent: Tuesday, August 11, 2015 10:12 PM
To: dev@spark.apache.org
Subject: Potential bug broadcastNestedLoopJoin or default value of spark.sql.autoBroadcastJoinThreshold

Hi,

Recently, I use spark sql to do join on non-equality condition, condition1 or condition2 for example.

Spark will use broadcastNestedLoopJoin to do this. Assume that one of dataframe(df1) is not created from hive table nor local collection and the other one is created from hivetable(df2). For df1, spark will use defaultSizeInBytes * length of df1 to estimate the size of df1 and use correct size for df2.

As the result, in most cases, spark will think df1 is bigger than df2 even df2 is really huge. And spark will do df2.collect(), which will cause error or slowness of program.

Maybe we should just use defaultSizeInBytes for logicalRDD, not defaultSizeInBytes * length?

Hope this could be helpful
Thanks a lot in advance for your help and input.

Cheers
Gen