You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Tomas Bartalos <to...@gmail.com> on 2019/07/12 17:44:24 UTC

Partition pruning by IDs from another table

Hello,
I have 2 parquet tables:
stored - table of 10 M records
data - table of 100K records

*This is fast:*
val dataW = data.where("registration_ts in (20190516204l,
20190515143l,20190510125l, 20190503151l)")
dataW.count
res44: Long = 42
//takes 3 seconds
stored.join(broadcast(dataW), Seq("registration_ts"), "leftsemi").collect

*Similar but its slow:*
val dataW = data.limit(10).select("registration_ts").distinct
dataW.count
res45: Long = 1
//takes 2 minutes
stored.join(broadcast(dataW), Seq("registration_ts"), "leftsemi").collect
[Stage 181:>                                                      (0 + 1) /
373]

The reason is that the first query propagates PartitionFilters up to joined
"stored" table:
... PartitionFilters: [registration_ts#1635L IN
(20190516204,20190515143,20190510125,20190503151)
And the second one is not:
PartitionFilters: []

For low number of IDs its more effective to collect them to driver and
issue a 2-nd query with partition filter, but there have to be a better
way...
How can I achieve effective partition pruning when using IDs from other
table ?

Following SQL have same query plan and same behavior:
spark.sql("select * from stored where exists (select 1 from dataW where
dataW.registration_ts = stored.registration_ts)")

Thank you,
Tomas