You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by hajyoussef amine <ha...@gmail.com> on 2022/11/09 18:10:18 UTC

[Spark Core] Adaptive dynamic partition pruning

Hello everyone,

Let me take the following spark sql example to demonstrate the issue we're
having:

```
Select * FROM small_table
    Inner join big_table on small_table.foreign_key =
big_table.partition_key
    Inner join bigger_table on big_table.foreign_key =
bigger_table.partition_key
  where small_date.date="2022-01-01"
```

The efficient execution plan in the case above is to load the small table
filtered by date(resulting in a few rows), use it to partition prune the
big_table so only relevant partitions are loaded, then join the two tables
together, and use the join result to partition prune the bigger table.

I can't find a way to easily implement the strategy above in spark. Dynamic
partition pruning seems to support only one level of depth, so the
big_table is partition pruned, but the bigger table is always fully loaded.

Spark tries to parallelize things so it loads all the tables in parallel.
Interestingly, however, that is not the ideal approach in this case. I'm
not sure if spark has a mechanism to cancel pending tasks and adaptively
change physical execution strategy as new information comes in(in this for
example, spark ideally cancels loading the bigger_table, after the
small_table big_table join result is available and a small amount of rows
are returned. spark can use the resulting rows to partition prune the
bigger table assuming partition keys are in the join condition)

The only way I found to implement the strategy is to break the computation
in two steps, persist the first join result into disk, and then load it and
use it to partition and prune the bigger table. The code will be something
like this:


```
spark.sql("""
Select * FROM small_table
    Inner join big_table on small_table.foreign_key =
big_table.partition_key
    where small_date.date="2022-01-01"

""").write.format("parquet").mode("overwrite").save("path/to/test.parquet")

spark.read.format("parquet").load("path/to/test.parquet").createOrReplaceTempView("step1")

spark.sql("""
Select * FROM step_1
    Inner join bigger_table on step_1.foreign_key =
bigger_table.partition_key
    where step_1.date="2022-01-01"
""").collect()
```

I could not get `persist` to trigger computation for some reason(even after
running `count` on it), that's why I had to save it into a parquet, and
then reload it.


The issue with the code above apart from having to save it in disk is that
it requires manual rewriting queries which is not convenient especially for
queries with multiple joins.


I'm looking for some insights on how to efficiently execute the query above
without having to fetch full data of the joined tables.

Re: [Spark Core] Adaptive dynamic partition pruning

Posted by Jie Han <tu...@gmail.com>.
Hmmm… Sorry, I don’t have an idea. Maybe we can try subquery? I’m not sure whether it can work :( . We need help from other members of the community.

> 2022年11月12日 00:10,hajyoussef amine <ha...@gmail.com> 写道:
> 
> Hi Jie,
> Let's suppose we have ((dimension_table Join fact_table1) join fact_table2). In the case where (dimension_table JOIN fact_table1) is small enough, the result ideally can be treated as another dimension table and thus used to prune the fact_table2. I don't find an easy way to implement it though.
> 
> 
> On Fri, Nov 11, 2022 at 4:32 PM Jie Han <tunyum94@gmail.com <ma...@gmail.com>> wrote:
> FYI, https://medium.com/@prabhakaran.electric/spark-3-0-feature-dynamic-partition-pruning-dpp-to-avoid-scanning-irrelevant-data-1a7bbd006a89 <https://medium.com/@prabhakaran.electric/spark-3-0-feature-dynamic-partition-pruning-dpp-to-avoid-scanning-irrelevant-data-1a7bbd006a89>
> 
> This blog may be helpful. Dynamic pruning often works for star schema queries. So, your fact table is big_table which is used to join the others. So there’s only one subqueryboradcast dynamicpruning plan before big_table’s scan while there’s none for the others.
> 
> I’m not sure that I’m correct. Hope it’s helpful to you.
> 
>> 2022年11月11日 21:43,hajyoussef amine <hajyoussef.amine@gmail.com <ma...@gmail.com>> 写道:
>> 
>> SubqueryBroadcast
> 


Re: [Spark Core] Adaptive dynamic partition pruning

Posted by hajyoussef amine <ha...@gmail.com>.
Hi Jie,
Let's suppose we have ((dimension_table Join fact_table1) join
fact_table2). In the case where (dimension_table JOIN fact_table1) is small
enough, the result ideally can be treated as another dimension table and
thus used to prune the fact_table2. I don't find an easy way to implement
it though.


On Fri, Nov 11, 2022 at 4:32 PM Jie Han <tu...@gmail.com> wrote:

> FYI,
> https://medium.com/@prabhakaran.electric/spark-3-0-feature-dynamic-partition-pruning-dpp-to-avoid-scanning-irrelevant-data-1a7bbd006a89
>
> This blog may be helpful. Dynamic pruning often works for star schema
> queries. So, your fact table is big_table which is used to join the others.
> So there’s only one subqueryboradcast dynamicpruning plan before
> big_table’s scan while there’s none for the others.
>
> I’m not sure that I’m correct. Hope it’s helpful to you.
>
> 2022年11月11日 21:43,hajyoussef amine <ha...@gmail.com> 写道:
>
> SubqueryBroadcast
>
>
>

Re: [Spark Core] Adaptive dynamic partition pruning

Posted by Jie Han <tu...@gmail.com>.
FYI, https://medium.com/@prabhakaran.electric/spark-3-0-feature-dynamic-partition-pruning-dpp-to-avoid-scanning-irrelevant-data-1a7bbd006a89 <https://medium.com/@prabhakaran.electric/spark-3-0-feature-dynamic-partition-pruning-dpp-to-avoid-scanning-irrelevant-data-1a7bbd006a89>

This blog may be helpful. Dynamic pruning often works for star schema queries. So, your fact table is big_table which is used to join the others. So there’s only one subqueryboradcast dynamicpruning plan before big_table’s scan while there’s none for the others.

I’m not sure that I’m correct. Hope it’s helpful to you.

> 2022年11月11日 21:43,hajyoussef amine <ha...@gmail.com> 写道:
> 
> SubqueryBroadcast


Re: [Spark Core] Adaptive dynamic partition pruning

Posted by hajyoussef amine <ha...@gmail.com>.
Hi Jie, Thank you for the response. Dynamic pruning work to filter prune
the first join not the second one. so in the example I shared above.
big_table is partition pruned but bigger_table is not.
Here's the result of running explain extended on the following query:

Select * FROM jlee_ntm.tt_om_po AS small_table
  INNER JOIN jlee_ntm.TT_OM_POLINE AS big_table ON
    small_table.year=big_table.year
  INNER JOIN jlee_ntm.TT_OM_POLINE_SHIPMENT AS bigger_table ON
    big_table.month=bigger_table.month
  WHERE
    small_table.createtime < "2003"


In the fictional query above: big_table is partitioned by year and
bigger_table is partitioned by month.


== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('small_table.createtime < 2003)
+- 'Join Inner, ('big_table.month = 'bigger_table.month)
:- 'Join Inner, ('small_table.year = 'big_table.year)
: :- 'SubqueryAlias small_table
: : +- 'UnresolvedRelation [default, small_table], [], false
: +- 'SubqueryAlias big_table
: +- 'UnresolvedRelation [default, big_table], [], false
+- 'SubqueryAlias bigger_table
+- 'UnresolvedRelation [default, bigger_table], [], false

== Analyzed Logical Plan ==

Project [ ... 281 more fields]
+- Filter (createtime#6730 < cast(2003 as timestamp))
+- Join Inner, (month#6924 = month#7098)
:- Join Inner, (year#6774 = year#6923)
: :- SubqueryAlias small_table
: : +- Relation default.small_table[... 44 more fields] parquet
: +- SubqueryAlias big_table
: +- Relation default.big_table[... 127 more fields] parquet
+- SubqueryAlias bigger_table
+- Relation default.bigger_table[... 62 more fields] parquet

== Optimized Logical Plan ==
Join Inner, (month#6924 = month#7098),
leftHint=(dynamicPruningFilterId=Some(7104))
:- Join Inner, (year#6774 = year#6923),
leftHint=(dynamicPruningFilterId=Some(7102))
: :- Filter ((isnotnull(createtime#6730) AND (createtime#6730 < 2003-01-01
00:00:00)) AND isnotnull(year#6774))
: : +- Filter ((year#6774 < year(cast(2003-01-01 00:00:00 as date))) OR
((year#6774 = year(cast(2003-01-01 00:00:00 as date))) AND (month#6775 <=
month(cast(2003-01-01 00:00:00 as date)))))
: : +- Relation default.small_table[... 44 more fields] parquet
: +- Filter ((isnotnull(year#6923) AND isnotnull(month#6924)) AND
dynamicpruning#7103 7102)
: +- Relation default.big_table[... 127 more fields] parquet
+- Filter (isnotnull(month#7098) AND dynamicpruning#7105 7104)
+- Relation default.bigger_table[... 62 more fields] parquet

== Physical Plan ==
BroadcastHashJoin [month#6924], [month#7098], Inner, BuildRight, false
:- BroadcastHashJoin [year#6774], [year#6923], Inner, BuildLeft, false
: :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[66, int,
true] as bigint)),false), [plan_id=1487]
: : +- *(1) Filter (isnotnull(createtime#6730) AND (createtime#6730 <
2003-01-01 00:00:00))
: : +- *(1) ColumnarToRow
: : +- FileScan parquet default.small_table[... 44 more fields] Batched:
true, DataFilters: [isnotnull(createtime#6730), (createtime#6730 <
2003-01-01 00:00:00)], Format: Parquet, Location: PreparedDeltaFileIndex(1
paths)[..., PartitionFilters: [((year#6774 < year(cast(2003-01-01 00:00:00
as date))) OR ((year#6774 = year(cast(2003-01-01 00:..., PushedFilters:
[IsNotNull(createtime), LessThan(createtime,2003-01-01 00:00:00.0)],
ReadSchema: ...
: +- Project [ ... 127 more fields]
: +- FileScan parquet default.big_table[... 127 more fields] Batched:
false, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1
paths)[..., PartitionFilters: [isnotnull(year#6923), isnotnull(month#6924),
dynamicpruningexpression(year#6923 IN dynamicprunin..., PushedFilters: [],
ReadSchema: ...
: +- SubqueryBroadcast dynamicpruning#7103, 0, [year#6774], false, [id=#1511
]
: +- ReusedExchange [ ... 44 more fields], BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[66, int, true] as
bigint)),false), [plan_id=1487]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[85, int,
true] as bigint)),false), [plan_id=1502]
+- *(2) ColumnarToRow
+- FileScan parquet default.bigger_table[.... 62 more fields] Batched:
true, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1
paths)[......, PartitionFilters: [isnotnull(month#7098),
dynamicpruningexpression(true)], PushedFilters: [], ReadSchema: ...


The interesting part is that I'm seeing a reference to dynamicpruning#7104
in the optimized logical plan of the default.bigger_table relation.
However, in the physical plan, it translates
into dynamicpruningexpression(true) which I assume doesn't filter anything.
Not sure why that's the case. in the query plan above, bigger_table will be
fully loaded.

On Fri, Nov 11, 2022 at 5:53 AM Jie Han <tu...@gmail.com> wrote:

> Which version are you using? I test it in spark 3.2.1 and sure that
> dynamic pruning works in queries with multi joins.
> BTW, could you execute ‘explain extended your sql’?
>
> > 2022年11月10日 02:10,hajyoussef amine <ha...@gmail.com> 写道:
> >
> > Hello everyone,
> >
> > Let me take the following spark sql example to demonstrate the issue
> we're having:
> >
> > ```
> > Select * FROM small_table
> >     Inner join big_table on small_table.foreign_key =
> big_table.partition_key
> >     Inner join bigger_table on big_table.foreign_key =
> bigger_table.partition_key
> >   where small_date.date="2022-01-01"
> > ```
> >
> > The efficient execution plan in the case above is to load the small
> table filtered by date(resulting in a few rows), use it to partition prune
> the big_table so only relevant partitions are loaded, then join the two
> tables together, and use the join result to partition prune the bigger
> table.
> >
> > I can't find a way to easily implement the strategy above in spark.
> Dynamic partition pruning seems to support only one level of depth, so the
> big_table is partition pruned, but the bigger table is always fully loaded.
> >
> > Spark tries to parallelize things so it loads all the tables in
> parallel. Interestingly, however, that is not the ideal approach in this
> case. I'm not sure if spark has a mechanism to cancel pending tasks and
> adaptively change physical execution strategy as new information comes
> in(in this for example, spark ideally cancels loading the bigger_table,
> after the small_table big_table join result is available and a small amount
> of rows are returned. spark can use the resulting rows to partition prune
> the bigger table assuming partition keys are in the join condition)
> >
> > The only way I found to implement the strategy is to break the
> computation in two steps, persist the first join result into disk, and then
> load it and use it to partition and prune the bigger table. The code will
> be something like this:
> >
> >
> > ```
> > spark.sql("""
> > Select * FROM small_table
> >     Inner join big_table on small_table.foreign_key =
> big_table.partition_key
> >     where small_date.date="2022-01-01"
> >
>  """).write.format("parquet").mode("overwrite").save("path/to/test.parquet")
> >
> >
> spark.read.format("parquet").load("path/to/test.parquet").createOrReplaceTempView("step1")
> >
> > spark.sql("""
> > Select * FROM step_1
> >     Inner join bigger_table on step_1.foreign_key =
> bigger_table.partition_key
> >     where step_1.date="2022-01-01"
> > """).collect()
> > ```
> >
> > I could not get `persist` to trigger computation for some reason(even
> after running `count` on it), that's why I had to save it into a parquet,
> and then reload it.
> >
> >
> > The issue with the code above apart from having to save it in disk is
> that it requires manual rewriting queries which is not convenient
> especially for queries with multiple joins.
> >
> >
> > I'm looking for some insights on how to efficiently execute the query
> above without having to fetch full data of the joined tables.
>
>

Re: [Spark Core] Adaptive dynamic partition pruning

Posted by Jie Han <tu...@gmail.com>.
Which version are you using? I test it in spark 3.2.1 and sure that dynamic pruning works in queries with multi joins.
BTW, could you execute ‘explain extended your sql’?

> 2022年11月10日 02:10,hajyoussef amine <ha...@gmail.com> 写道:
> 
> Hello everyone,
> 
> Let me take the following spark sql example to demonstrate the issue we're having:
> 
> ```
> Select * FROM small_table
>     Inner join big_table on small_table.foreign_key = big_table.partition_key
>     Inner join bigger_table on big_table.foreign_key = bigger_table.partition_key
>   where small_date.date="2022-01-01"
> ```
> 
> The efficient execution plan in the case above is to load the small table filtered by date(resulting in a few rows), use it to partition prune the big_table so only relevant partitions are loaded, then join the two tables together, and use the join result to partition prune the bigger table.
> 
> I can't find a way to easily implement the strategy above in spark. Dynamic partition pruning seems to support only one level of depth, so the big_table is partition pruned, but the bigger table is always fully loaded.
> 
> Spark tries to parallelize things so it loads all the tables in parallel. Interestingly, however, that is not the ideal approach in this case. I'm not sure if spark has a mechanism to cancel pending tasks and adaptively change physical execution strategy as new information comes in(in this for example, spark ideally cancels loading the bigger_table, after the small_table big_table join result is available and a small amount of rows are returned. spark can use the resulting rows to partition prune the bigger table assuming partition keys are in the join condition)
> 
> The only way I found to implement the strategy is to break the computation in two steps, persist the first join result into disk, and then load it and use it to partition and prune the bigger table. The code will be something like this:
> 
> 
> ```
> spark.sql("""
> Select * FROM small_table
>     Inner join big_table on small_table.foreign_key = big_table.partition_key
>     where small_date.date="2022-01-01"
>     """).write.format("parquet").mode("overwrite").save("path/to/test.parquet")
> 
> spark.read.format("parquet").load("path/to/test.parquet").createOrReplaceTempView("step1")
> 
> spark.sql("""
> Select * FROM step_1
>     Inner join bigger_table on step_1.foreign_key = bigger_table.partition_key
>     where step_1.date="2022-01-01"
> """).collect()
> ```
> 
> I could not get `persist` to trigger computation for some reason(even after running `count` on it), that's why I had to save it into a parquet, and then reload it.
> 
> 
> The issue with the code above apart from having to save it in disk is that it requires manual rewriting queries which is not convenient especially for queries with multiple joins. 
> 
> 
> I'm looking for some insights on how to efficiently execute the query above without having to fetch full data of the joined tables.


---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org