You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:02:33 UTC

[jira] [Updated] (SPARK-23771) Uneven Rowgroup size after repartition

     [ https://issues.apache.org/jira/browse/SPARK-23771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon updated SPARK-23771:
---------------------------------
    Labels: bulk-closed  (was: )

> Uneven Rowgroup size after repartition
> --------------------------------------
>
>                 Key: SPARK-23771
>                 URL: https://issues.apache.org/jira/browse/SPARK-23771
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output, Shuffle, SQL
>    Affects Versions: 1.6.0, 2.2.0
>         Environment: Cloudera CDH 5.13.1
>            Reporter: Johannes Mayer
>            Priority: Major
>              Labels: bulk-closed
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I have a Hive table on AVRO files, that i want to read and store as a partitioned parquet files (one file per partition).
> What i do is:
> {code:java}
> // read the AVRO table and distribute by the partition column
> val data = sql("select * from avro_table distribute by part_col")
>  
> // write data as partitioned parquet files
> data.write.partitionBy(part_col).parquet("output/path/")
> {code}
>  
> I get one file per partition as expected. But often I run into OutOfMemory Errors. Investigating the issue I found out, that some row groups are very big and since all data of a row group is held in memory before it is flushed to disk, i think this causes the OutOfMemory. Other row groups are very small, containing almost no data. See the output from parquet-tools meta:
>  
> {code:java}
> row group 1: RC:5740100 TS:566954562 OFFSET:4 
> row group 2: RC:33769 TS:2904145 OFFSET:117971092 
> row group 3: RC:31822 TS:2772650 OFFSET:118905225 
> row group 4: RC:29854 TS:2704127 OFFSET:119793188 
> row group 5: RC:28050 TS:2356729 OFFSET:120660675 
> row group 6: RC:26507 TS:2111983 OFFSET:121406541 
> row group 7: RC:25143 TS:1967731 OFFSET:122069351 
> row group 8: RC:23876 TS:1991238 OFFSET:122682160 
> row group 9: RC:22584 TS:2069463 OFFSET:123303246 
> row group 10: RC:21225 TS:1955748 OFFSET:123960700 
> row group 11: RC:19960 TS:1931889 OFFSET:124575333 
> row group 12: RC:18806 TS:1725871 OFFSET:125132862 
> row group 13: RC:17719 TS:1653309 OFFSET:125668057 
> row group 14: RC:1617743 TS:157973949 OFFSET:134217728{code}
>  
> One thing to notice is, that this file was written in a Spark application running on 13 executors. Is it possible, that local data is in the big row group and the remote reads go into seperate (small) row groups? The shuffle is involved, because data is read with distribute by clause.
>  
> Is this a known bug? Is there a workaround to get even row group sizes? I want to decrease the row group size using sc.hadoopConfiguration.setInt("parquet.block.size", 64 * 1024 * 1024)
>  
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org