You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Tomas Bartalos <to...@gmail.com> on 2021/04/07 23:51:24 UTC

Big Broadcast Hash Join with Dynamic Partition Pruning gives wrong results

when I try to do a Broadcast Hash Join on a bigger table (6Mil rows) I get
an incorrect result of 0 rows.

val rightDF = spark.read.format("parquet").load("table-a")
val leftDF =  spark.read.format("parquet").load("table-b")
  //needed to activate dynamic pruning subquery
  .where('part_ts === 20210304000L)

// leftDF has 7 Mil rows ~ 120 MB
val join = broadcast(leftDF).join(rightDF,
  $"match_part_id" === $"part_id" && $"match_id" === $"id"
)
join.count

res1: Long = 0

I think it's connected with Dynamic Partition Pruning of the rightDF, which
is happening according to the plan:

PartitionFilters: [isnotnull(part_id#477L),
dynamicpruningexpression(part_id#477L IN dynamicpruning#534)]

===== Subqueries =====

Subquery:1 Hosting operator id = 6 Hosting Expression = part_id#477L
IN dynamicpruning#534
ReusedExchange (11)


(11) ReusedExchange [Reuses operator id: 5]
Output [4]: [match_part_id#487L, match_id#488L, UK#489, part_ts#490L]

*Removing the broadcast hint OR shrinking the broadcasted table corrects
the result*:

val rightDF = spark.read.format("parquet").load("table-a")
val leftDF =  spark.read.format("parquet").load("table-b")
  //needed to activate dynamic pruning subquery
  .where('part_ts === 20210304000L)
 // shrinks the broadcasted table to 18K rows
 .where('match_id === 33358792)

// leftDF has 18K rows
val join = broadcast(leftDF).join(rightDF,
  $"match_part_id" === $"part_id" && $"match_id" === $"id"
)
join.count

res2: Long = 379701

I would expect the broadcast to fail, but would never expect to get
incorrect results without an exception. What do you think ?


BR,

Tomas

Re: Big Broadcast Hash Join with Dynamic Partition Pruning gives wrong results

Posted by Wenchen Fan <cl...@gmail.com>.
Hi Tomas, thanks for reporting this bug!

Is it possible to share your dataset so that other people can reproduce and
debug it?

On Thu, Apr 8, 2021 at 7:52 AM Tomas Bartalos <to...@gmail.com>
wrote:

> when I try to do a Broadcast Hash Join on a bigger table (6Mil rows) I get
> an incorrect result of 0 rows.
>
> val rightDF = spark.read.format("parquet").load("table-a")
> val leftDF =  spark.read.format("parquet").load("table-b")
>   //needed to activate dynamic pruning subquery
>   .where('part_ts === 20210304000L)
>
> // leftDF has 7 Mil rows ~ 120 MB
> val join = broadcast(leftDF).join(rightDF,
>   $"match_part_id" === $"part_id" && $"match_id" === $"id"
> )
> join.count
>
> res1: Long = 0
>
> I think it's connected with Dynamic Partition Pruning of the rightDF,
> which is happening according to the plan:
>
> PartitionFilters: [isnotnull(part_id#477L), dynamicpruningexpression(part_id#477L IN dynamicpruning#534)]
>
> ===== Subqueries =====
>
> Subquery:1 Hosting operator id = 6 Hosting Expression = part_id#477L IN dynamicpruning#534
> ReusedExchange (11)
>
>
> (11) ReusedExchange [Reuses operator id: 5]
> Output [4]: [match_part_id#487L, match_id#488L, UK#489, part_ts#490L]
>
> *Removing the broadcast hint OR shrinking the broadcasted table corrects
> the result*:
>
> val rightDF = spark.read.format("parquet").load("table-a")
> val leftDF =  spark.read.format("parquet").load("table-b")
>   //needed to activate dynamic pruning subquery
>   .where('part_ts === 20210304000L)
>  // shrinks the broadcasted table to 18K rows
>  .where('match_id === 33358792)
>
> // leftDF has 18K rows
> val join = broadcast(leftDF).join(rightDF,
>   $"match_part_id" === $"part_id" && $"match_id" === $"id"
> )
> join.count
>
> res2: Long = 379701
>
> I would expect the broadcast to fail, but would never expect to get
> incorrect results without an exception. What do you think ?
>
>
> BR,
>
> Tomas
>

Re: Big Broadcast Hash Join with Dynamic Partition Pruning gives wrong results

Posted by Wenchen Fan <cl...@gmail.com>.
Hi Tomas, thanks for reporting this bug!

Is it possible to share your dataset so that other people can reproduce and
debug it?

On Thu, Apr 8, 2021 at 7:52 AM Tomas Bartalos <to...@gmail.com>
wrote:

> when I try to do a Broadcast Hash Join on a bigger table (6Mil rows) I get
> an incorrect result of 0 rows.
>
> val rightDF = spark.read.format("parquet").load("table-a")
> val leftDF =  spark.read.format("parquet").load("table-b")
>   //needed to activate dynamic pruning subquery
>   .where('part_ts === 20210304000L)
>
> // leftDF has 7 Mil rows ~ 120 MB
> val join = broadcast(leftDF).join(rightDF,
>   $"match_part_id" === $"part_id" && $"match_id" === $"id"
> )
> join.count
>
> res1: Long = 0
>
> I think it's connected with Dynamic Partition Pruning of the rightDF,
> which is happening according to the plan:
>
> PartitionFilters: [isnotnull(part_id#477L), dynamicpruningexpression(part_id#477L IN dynamicpruning#534)]
>
> ===== Subqueries =====
>
> Subquery:1 Hosting operator id = 6 Hosting Expression = part_id#477L IN dynamicpruning#534
> ReusedExchange (11)
>
>
> (11) ReusedExchange [Reuses operator id: 5]
> Output [4]: [match_part_id#487L, match_id#488L, UK#489, part_ts#490L]
>
> *Removing the broadcast hint OR shrinking the broadcasted table corrects
> the result*:
>
> val rightDF = spark.read.format("parquet").load("table-a")
> val leftDF =  spark.read.format("parquet").load("table-b")
>   //needed to activate dynamic pruning subquery
>   .where('part_ts === 20210304000L)
>  // shrinks the broadcasted table to 18K rows
>  .where('match_id === 33358792)
>
> // leftDF has 18K rows
> val join = broadcast(leftDF).join(rightDF,
>   $"match_part_id" === $"part_id" && $"match_id" === $"id"
> )
> join.count
>
> res2: Long = 379701
>
> I would expect the broadcast to fail, but would never expect to get
> incorrect results without an exception. What do you think ?
>
>
> BR,
>
> Tomas
>