You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Tomas Bartalos <to...@gmail.com> on 2019/05/03 16:38:22 UTC

Howto force spark to honor parquet partitioning

Hello,

I have partitioned parquet files based on "event_hour" column.
After reading parquet files to spark:
spark.read.format("parquet").load("...")
Files from the same parquet partition are scattered in many spark
partitions.

Example of mapping spark partition -> parquet partition:

Spark partition 1 -> 2019050101, 2019050102, 2019050103
Spark partition 2 -> 2019050101, 2019050103, 2019050104
...
Spark partition 20 -> 2019050101, ...
Spark partition 21 -> 2019050101, ...

As you can see parquet partition 2019050101 is present in Spark partition
1, 2, 20, 21.
As a result when I write out the dataFrame:
df.write.partitionBy("event_hour").format("parquet").save("...")

 There are many files created in one parquet partition (In case of our
example its 4 files, but in reality its much more)
To speed up queries, my goal is to write 1 file per parquet partition (1
file per hour).

So far my only solution is to use repartition:
df.repartition(col("event_hour"))

But there is a lot of overhead with unnecessary shuffle. I'd like to force
spark to "pickup" the parquet partitioning.

In my investigation I've found
org.apache.spark.sql.execution.FileSourceScanExec#createNonBucketedReadRDD
<https://github.com/apache/spark/blob/a44880ba74caab7a987128cb09c4bee41617770a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L452>
where the initial partitioning is happening based on file sizes. There is
an explicit ordering which causes parquet partition shuffle.

thank you for your help,
Tomas

Re: Howto force spark to honor parquet partitioning

Posted by Gourav Sengupta <go...@gmail.com>.
so you want data from one physical partition in the disk to go to only one
executor?

On Fri, May 3, 2019 at 5:38 PM Tomas Bartalos <to...@gmail.com>
wrote:

> Hello,
>
> I have partitioned parquet files based on "event_hour" column.
> After reading parquet files to spark:
> spark.read.format("parquet").load("...")
> Files from the same parquet partition are scattered in many spark
> partitions.
>
> Example of mapping spark partition -> parquet partition:
>
> Spark partition 1 -> 2019050101, 2019050102, 2019050103
> Spark partition 2 -> 2019050101, 2019050103, 2019050104
> ...
> Spark partition 20 -> 2019050101, ...
> Spark partition 21 -> 2019050101, ...
>
> As you can see parquet partition 2019050101 is present in Spark partition
> 1, 2, 20, 21.
> As a result when I write out the dataFrame:
> df.write.partitionBy("event_hour").format("parquet").save("...")
>
>  There are many files created in one parquet partition (In case of our
> example its 4 files, but in reality its much more)
> To speed up queries, my goal is to write 1 file per parquet partition (1
> file per hour).
>
> So far my only solution is to use repartition:
> df.repartition(col("event_hour"))
>
> But there is a lot of overhead with unnecessary shuffle. I'd like to force
> spark to "pickup" the parquet partitioning.
>
> In my investigation I've found
> org.apache.spark.sql.execution.FileSourceScanExec#createNonBucketedReadRDD
> <https://github.com/apache/spark/blob/a44880ba74caab7a987128cb09c4bee41617770a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L452>
> where the initial partitioning is happening based on file sizes. There is
> an explicit ordering which causes parquet partition shuffle.
>
> thank you for your help,
> Tomas
>