You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Danil Suetin <su...@protonmail.com.INVALID> on 2022/02/11 14:15:24 UTC

Repartitioning dataframe by file wite size and preserving order

Hello,

I want to be able to write dataframe with set average size of the file using orc or parquet. Also, preserving dataframe sorting is important. The task is that I have a dataframe that I know nothing about as an argument, and I need to write orc or parquet files with constant size with minimal variance.

Most of repartitioning methods work either with number of partitions or max number of rows per file. Which both can be calculated from average row size, via sampling dataframe, serializing it with compression, and finding it's size. Which of course not perfect due to compression of row columnar form. But as estimation, might just do it.

As for repartitioning itself, then I have tried:

- df.repartition - Can split into equivalent partition, but due to hash partitioning sorting is not preserved
- df.repartitionByRange.sortWithinPartitions - Can preserve sorting if known original sorting keys (I might not know them), although some say repartitionByRange might not always preserve sorting. But if keys not uniformly distributed, then file size will vary a lot.

- df.coalesce - Sorting seems to be preserved, although some say that not always the case for every version of spark. Also, partition size may vary a lot. And can only decrease number of partitions.

- df.write.option("maxRecordsPerFile", 10000) - Not sure about sorting preservation. And also seem there still problem with small files, due to no minimum on records per file. Merging to one partition and then using maxRecordsPerFile won't work since it might not fit in one partition.

What I am trying to solve seems to be a complex bin packing problem, which should also parallelize. As a simple way, I thought I might do it is by counting rows in each partition and creating new dataframe with tasks kind of this way:
Let's say I want 100M sized files with 2M (just for example) as average row size. Which means 50 rows per file/partition.
[repartition.png]

But I am not quite sure if it's the best way of doing that. Also, there is going to be needed a lot of optimizing in locality of tasks and partitions in order to reduce network load.

Are there any built in things or libraries that might help me solve that? Or anyone had the same issue?

Thanks in advance
Danil