You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "John Zhuge (JIRA)" <ji...@apache.org> on 2019/01/09 17:13:00 UTC

[jira] [Comment Edited] (SPARK-26576) Broadcast hint not applied to partitioned Parquet table

    [ https://issues.apache.org/jira/browse/SPARK-26576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16737964#comment-16737964 ] 

John Zhuge edited comment on SPARK-26576 at 1/9/19 5:12 PM:
------------------------------------------------------------

No issue on the master branch. Please note "rightHint=(broadcast)" for the Join in Optimized Plan.
{noformat}
scala> Seq(spark.table("jzhuge.parquet_with_part")).map(df => df.join(broadcast(df), "dateint").explain(true))

== Parsed Logical Plan ==
'Join UsingJoin(Inner,List(dateint))
:- SubqueryAlias `jzhuge`.`parquet_with_part`
:  +- Relation[val#34,dateint#35] parquet
+- ResolvedHint (broadcast)
   +- SubqueryAlias `jzhuge`.`parquet_with_part`
      +- Relation[val#40,dateint#41] parquet

== Analyzed Logical Plan ==
dateint: int, val: string, val: string
Project [dateint#35, val#34, val#40]
+- Join Inner, (dateint#35 = dateint#41)
   :- SubqueryAlias `jzhuge`.`parquet_with_part`
   :  +- Relation[val#34,dateint#35] parquet
   +- ResolvedHint (broadcast)
      +- SubqueryAlias `jzhuge`.`parquet_with_part`
         +- Relation[val#40,dateint#41] parquet

== Optimized Logical Plan ==
Project [dateint#35, val#34, val#40]
+- Join Inner, (dateint#35 = dateint#41), rightHint=(broadcast)
   :- Project [val#34, dateint#35]
   :  +- Filter isnotnull(dateint#35)
   :     +- Relation[val#34,dateint#35] parquet
   +- Project [val#40, dateint#41]
      +- Filter isnotnull(dateint#41)
         +- Relation[val#40,dateint#41] parquet

== Physical Plan ==
*(2) Project [dateint#35, val#34, val#40]
+- *(2) BroadcastHashJoin [dateint#35], [dateint#41], Inner, BuildRight
   :- *(2) FileScan parquet jzhuge.parquet_with_part[val#34,dateint#35] Batched: true, DataFilters: [], Format: Parquet, Location: PrunedInMemoryFileIndex[], PartitionCount: 0, PartitionFilters: [isnotnull(dateint#35)], PushedFilters: [], ReadSchema: struct<val:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, true] as bigint)))
      +- *(1) FileScan parquet jzhuge.parquet_with_part[val#40,dateint#41] Batched: true, DataFilters: [], Format: Parquet, Location: PrunedInMemoryFileIndex[], PartitionCount: 0, PartitionFilters: [isnotnull(dateint#41)], PushedFilters: [], ReadSchema: struct<val:string>
{noformat}
From a quick look at the source, EliminateResolvedHint pulls broadcast hint into Join and eliminates the ResolvedHint node. It is called before PruneFileSourcePartitions so the above code in PhysicalOperation.collectProjectsAndFilters is never called on master branch for the few cases I tried.


was (Author: jzhuge):
No issue on the master branch. Please note "rightHint=(broadcast)" for the Join in Optimized Plan.
{noformat}
scala> Seq(spark.table("jzhuge.parquet_with_part")).map(df => df.join(broadcast(df), "dateint").explain(true))

== Parsed Logical Plan ==
'Join UsingJoin(Inner,List(dateint))
:- SubqueryAlias `jzhuge`.`parquet_with_part`
:  +- Relation[val#34,dateint#35] parquet
+- ResolvedHint (broadcast)
   +- SubqueryAlias `jzhuge`.`parquet_with_part`
      +- Relation[val#40,dateint#41] parquet

== Analyzed Logical Plan ==
dateint: int, val: string, val: string
Project [dateint#35, val#34, val#40]
+- Join Inner, (dateint#35 = dateint#41)
   :- SubqueryAlias `jzhuge`.`parquet_with_part`
   :  +- Relation[val#34,dateint#35] parquet
   +- ResolvedHint (broadcast)
      +- SubqueryAlias `jzhuge`.`parquet_with_part`
         +- Relation[val#40,dateint#41] parquet

== Optimized Logical Plan ==
Project [dateint#35, val#34, val#40]
+- Join Inner, (dateint#35 = dateint#41), rightHint=(broadcast)
   :- Project [val#34, dateint#35]
   :  +- Filter isnotnull(dateint#35)
   :     +- Relation[val#34,dateint#35] parquet
   +- Project [val#40, dateint#41]
      +- Filter isnotnull(dateint#41)
         +- Relation[val#40,dateint#41] parquet

== Physical Plan ==
*(2) Project [dateint#35, val#34, val#40]
+- *(2) BroadcastHashJoin [dateint#35], [dateint#41], Inner, BuildRight
   :- *(2) FileScan parquet jzhuge.parquet_with_part[val#34,dateint#35] Batched: true, DataFilters: [], Format: Parquet, Location: PrunedInMemoryFileIndex[], PartitionCount: 0, PartitionFilters: [isnotnull(dateint#35)], PushedFilters: [], ReadSchema: struct<val:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, true] as bigint)))
      +- *(1) FileScan parquet jzhuge.parquet_with_part[val#40,dateint#41] Batched: true, DataFilters: [], Format: Parquet, Location: PrunedInMemoryFileIndex[], PartitionCount: 0, PartitionFilters: [isnotnull(dateint#41)], PushedFilters: [], ReadSchema: struct<val:string>
{noformat}
From a quick look at the source, EliminateResolvedHint pulls broadcast hint into Join and eliminates the ResolvedHint node. It is called before PruneFileSourcePartitions so the above code in PhysicalOperation.collectProjectsAndFilters is never called on master branch.

> Broadcast hint not applied to partitioned Parquet table
> -------------------------------------------------------
>
>                 Key: SPARK-26576
>                 URL: https://issues.apache.org/jira/browse/SPARK-26576
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.2, 2.4.0
>            Reporter: John Zhuge
>            Priority: Major
>
> Broadcast hint is not applied to partitioned Parquet table. Below "SortMergeJoin" is chosen incorrectly and "ResolvedHit(broadcast)" is removed in Optimized Plan.
> {noformat}
> scala> spark.sql("CREATE TABLE jzhuge.parquet_with_part (val STRING) PARTITIONED BY (dateint INT) STORED AS parquet")
> scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
> scala> Seq(spark.table("jzhuge.parquet_with_part")).map(df => df.join(broadcast(df), "dateint").explain(true))
> == Parsed Logical Plan ==
> 'Join UsingJoin(Inner,List(dateint))
> :- SubqueryAlias `jzhuge`.`parquet_with_part`
> :  +- Relation[val#28,dateint#29] parquet
> +- ResolvedHint (broadcast)
>    +- SubqueryAlias `jzhuge`.`parquet_with_part`
>       +- Relation[val#32,dateint#33] parquet
> == Analyzed Logical Plan ==
> dateint: int, val: string, val: string
> Project [dateint#29, val#28, val#32]
> +- Join Inner, (dateint#29 = dateint#33)
>    :- SubqueryAlias `jzhuge`.`parquet_with_part`
>    :  +- Relation[val#28,dateint#29] parquet
>    +- ResolvedHint (broadcast)
>       +- SubqueryAlias `jzhuge`.`parquet_with_part`
>          +- Relation[val#32,dateint#33] parquet
> == Optimized Logical Plan ==
> Project [dateint#29, val#28, val#32]
> +- Join Inner, (dateint#29 = dateint#33)
>    :- Project [val#28, dateint#29]
>    :  +- Filter isnotnull(dateint#29)
>    :     +- Relation[val#28,dateint#29] parquet
>    +- Project [val#32, dateint#33]
>       +- Filter isnotnull(dateint#33)
>          +- Relation[val#32,dateint#33] parquet
> == Physical Plan ==
> *(5) Project [dateint#29, val#28, val#32]
> +- *(5) SortMergeJoin [dateint#29], [dateint#33], Inner
>    :- *(2) Sort [dateint#29 ASC NULLS FIRST], false, 0
>    :  +- Exchange(coordinator id: 55629191) hashpartitioning(dateint#29, 500), coordinator[target post-shuffle partition size: 67108864]
>    :     +- *(1) FileScan parquet jzhuge.parquet_with_part[val#28,dateint#29] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[], PartitionCount: 0, PartitionFilters: [isnotnull(dateint#29)], PushedFilters: [], ReadSchema: struct<val:string>
>    +- *(4) Sort [dateint#33 ASC NULLS FIRST], false, 0
>       +- ReusedExchange [val#32, dateint#33], Exchange(coordinator id: 55629191) hashpartitioning(dateint#29, 500), coordinator[target post-shuffle partition size: 67108864]
> {noformat}
> Broadcast hint is applied to Parquet table without partition. Below "BroadcastHashJoin" is chosen as expected.
> {noformat}
> scala> spark.sql("CREATE TABLE jzhuge.parquet_no_part (val STRING, dateint INT) STORED AS parquet")
> scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
> scala> Seq(spark.table("jzhuge.parquet_no_part")).map(df => df.join(broadcast(df), "dateint").explain(true))
> == Parsed Logical Plan ==
> 'Join UsingJoin(Inner,List(dateint))
> :- SubqueryAlias `jzhuge`.`parquet_no_part`
> :  +- Relation[val#44,dateint#45] parquet
> +- ResolvedHint (broadcast)
>    +- SubqueryAlias `jzhuge`.`parquet_no_part`
>       +- Relation[val#50,dateint#51] parquet
> == Analyzed Logical Plan ==
> dateint: int, val: string, val: string
> Project [dateint#45, val#44, val#50]
> +- Join Inner, (dateint#45 = dateint#51)
>    :- SubqueryAlias `jzhuge`.`parquet_no_part`
>    :  +- Relation[val#44,dateint#45] parquet
>    +- ResolvedHint (broadcast)
>       +- SubqueryAlias `jzhuge`.`parquet_no_part`
>          +- Relation[val#50,dateint#51] parquet
> == Optimized Logical Plan ==
> Project [dateint#45, val#44, val#50]
> +- Join Inner, (dateint#45 = dateint#51)
>    :- Filter isnotnull(dateint#45)
>    :  +- Relation[val#44,dateint#45] parquet
>    +- ResolvedHint (broadcast)
>       +- Filter isnotnull(dateint#51)
>          +- Relation[val#50,dateint#51] parquet
> == Physical Plan ==
> *(2) Project [dateint#45, val#44, val#50]
> +- *(2) BroadcastHashJoin [dateint#45], [dateint#51], Inner, BuildRight
>    :- *(2) Project [val#44, dateint#45]
>    :  +- *(2) Filter isnotnull(dateint#45)
>    :     +- *(2) FileScan parquet jzhuge.parquet_no_part[val#44,dateint#45] Batched: true, Format: Parquet, Location: InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(dateint)], ReadSchema: struct<val:string,dateint:int>
>    +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, true] as bigint)))
>       +- *(1) Project [val#50, dateint#51]
>          +- *(1) Filter isnotnull(dateint#51)
>             +- *(1) FileScan parquet jzhuge.parquet_no_part[val#50,dateint#51] Batched: true, Format: Parquet, Location: InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: [IsNotNull(dateint)], ReadSchema: struct<val:string,dateint:int>
> {noformat}
> Observed similar issue with partitioned Orc table. SequenceFile is fine.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org