You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Alexey Kudinkin (Jira)" <ji...@apache.org> on 2022/08/23 01:13:00 UTC

[jira] [Closed] (HUDI-1461) Bulk insert v2 creates additional small files

     [ https://issues.apache.org/jira/browse/HUDI-1461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Alexey Kudinkin closed HUDI-1461.
---------------------------------
    Resolution: Duplicate

> Bulk insert v2 creates additional small files
> ---------------------------------------------
>
>                 Key: HUDI-1461
>                 URL: https://issues.apache.org/jira/browse/HUDI-1461
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: performance
>            Reporter: Wenning Ding
>            Priority: Major
>
> I took a look at the data preparation step for bulk insert, I found that current logic will create additional small files when performing bulk insert v2 which will hurt the performance.
> Current logic is to first sort the input dataframe and then do coalesce: [https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java#L104-L106]
> For example, we set BulkInsertShuffleParallelism to 2 and have the following df as input:
> {code:java}
> val df = Seq(
>   (100, "event_name_16", "2015-01-01T13:51:39.340396Z", "type1"),
>   (101, "event_name_546", "2015-01-01T12:14:58.597216Z", "type2"),
>   (104, "event_name_123", "2015-01-01T12:15:00.512679Z", "type1"),
>   (108, "event_name_18", "2015-01-01T11:51:33.340396Z", "type1"),
>   (109, "event_name_19", "2014-01-01T11:51:33.340396Z", "type3"),
>   (110, "event_name_20", "2014-02-01T11:51:33.340396Z", "type3"),
>   (105, "event_name_678", "2015-01-01T13:51:42.248818Z", "type2")
>   ).toDF("event_id", "event_name", "event_ts", "event_type")
> {code}
> (Here I added a new column partitionID for better understanding) Based on the current logic, after sorting and coalesce, the dataframe would become:
> {code:java}
> val df2 = df.sort(functions.col("event_type"), functions.col("event_id")).coalesce(2)
> df2.withColumn("partitionID", spark_partition_id).show(false)
> +--------+--------------+---------------------------+----------+-----------+
> |event_id|event_name    |event_ts                   |event_type|partitionID|
> +--------+--------------+---------------------------+----------+-----------+
> |100     |event_name_16 |2015-01-01T13:51:39.340396Z|type1     |0          |
> |108     |event_name_18 |2015-01-01T11:51:33.340396Z|type1     |0          |
> |105     |event_name_678|2015-01-01T13:51:42.248818Z|type2     |0          |
> |110     |event_name_20 |2014-02-01T11:51:33.340396Z|type3     |0          |
> |104     |event_name_123|2015-01-01T12:15:00.512679Z|type1     |1          |
> |101     |event_name_546|2015-01-01T12:14:58.597216Z|type2     |1          |
> |109     |event_name_19 |2014-01-01T11:51:33.340396Z|type3     |1          |
> +--------+--------------+---------------------------+----------+-----------+
> {code}
> You can see the coalesce result actually does not depend on the sorting result. Each spark partition id contains 3 types of Hudi partitions.
> So during the writing phase, each spark executor would get its corresponding partition id, and each executor would create 3 files under 3 Hudi partitions. Finally we have two parquet files under each Hudi partition. But with such a small dataset, ideally we should have single file under each Hudi partition.
> If I change the sort to repartition:
> {code:java}
> val df3 = df.repartition(functions.col("event_type")).coalesce(2)
> df3.withColumn("partitionID", spark_partition_id).show(false)
> +--------+--------------+---------------------------+----------+-----------+
> |event_id|event_name    |event_ts                   |event_type|partitionID|
> +--------+--------------+---------------------------+----------+-----------+
> |100     |event_name_16 |2015-01-01T13:51:39.340396Z|type1     |0          |
> |104     |event_name_123|2015-01-01T12:15:00.512679Z|type1     |0          |
> |108     |event_name_18 |2015-01-01T11:51:33.340396Z|type1     |0          |
> |101     |event_name_546|2015-01-01T12:14:58.597216Z|type2     |1          |
> |105     |event_name_678|2015-01-01T13:51:42.248818Z|type2     |1          |
> |109     |event_name_19 |2014-01-01T11:51:33.340396Z|type3     |1          |
> |110     |event_name_20 |2014-02-01T11:51:33.340396Z|type3     |1          |
> +--------+--------------+---------------------------+----------+-----------+
> {code}
> In this case, we can have single file under each Hudi partition.
>  
> But according to our understanding, we still need the sort part so that we can get benefit from min/max record key index. So the problem is how should we correctly handle the logic.
> Repartition and sort within each partition might be a way? Though sort within each partition might cause OOM issue if the data is unbalance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)