You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "Cheng Lian (JIRA)" <ji...@apache.org> on 2015/06/06 17:41:00 UTC

[jira] [Comment Edited] (PARQUET-222) parquet writer runs into OOM during writing when calling DataFrame.saveAsParquetFile in Spark SQL

    [ https://issues.apache.org/jira/browse/PARQUET-222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14575783#comment-14575783 ] 

Cheng Lian edited comment on PARQUET-222 at 6/6/15 3:40 PM:
------------------------------------------------------------

There are several ways to alleviate this.

Firstly, for those DataFrames whose data sizes are small (e.g., the single row case [~phatak.dev] mentioned), you may try {{df.coalesce(1)}} to reduce partition number to 1.  In this way, only a single file will be written.  In most cases, the default parallelism equals to the number of cores.  For example, if you are running a Spark application with a single executor on a single 8-core node, that executor process needs to write 8 Parquet files even there's only a single row.

Secondly, when you are writing DataFrames with large volume, you may try to adjust DataFrame partition number (via {{df.repartition(m)}} and/or {{df.coalesce(m)}}) and executor number (via the {{--num-executors}} flag of {{spark-submit}}) to ensure the former is less than or equal to the latter, so that each executor process only opens and writes at most one Parquet file.

And of course, heap size of a single executor should be large enough to allow Parquet to write at least a single file.


was (Author: lian cheng):
There are several ways to alleviate this.

Firstly, for those DataFrames whose data sizes are small (e.g., the single row case [~phatak.dev] mentioned), you may try {{df.coalesce(1)}} to reduce partition number to 1.  In this way, only a single file will be written.  In most cases, the default parallelism equals to the number of cores.  For example, if you are running a Spark application with a single executor on a single 8-core node, that executor process needs to write 8 Parquet files even there's only a single row.

Secondly, when you are writing DataFrames with large volume, you may try to adjust DataFrame partition number (via {{df.repartition(n)}} and/or {{df.coalesce(n)}}) and executor number (via the {{--num-executors}} flag of {{spark-submit}}) to ensure the former is less than or equal to the latter, so that each executor process only opens and writes at most one Parquet file.

And of course, heap size of a single executor should be large enough to allow Parquet to write at least a single file.

> parquet writer runs into OOM during writing when calling DataFrame.saveAsParquetFile in Spark SQL
> -------------------------------------------------------------------------------------------------
>
>                 Key: PARQUET-222
>                 URL: https://issues.apache.org/jira/browse/PARQUET-222
>             Project: Parquet
>          Issue Type: Bug
>          Components: parquet-mr
>    Affects Versions: 1.6.0
>            Reporter: Chaozhong Yang
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> In Spark SQL, there is a function {{saveAsParquetFile}} in {{DataFrame}} or {{SchemaRDD}}. That function calls method in parquet-mr, and sometimes it will fail due to the OOM error thrown by parquet-mr. We can see the exception stack trace  as follows:
> {noformat}
> WARN] [task-result-getter-3] 03-19 11:17:58,274 [TaskSetManager] - Lost task 0.2 in stage 137.0 (TID 309, hb1.avoscloud.com): java.lang.OutOfMemoryError: Java heap space
>         at parquet.column.values.dictionary.IntList.initSlab(IntList.java:87)
>         at parquet.column.values.dictionary.IntList.<init>(IntList.java:83)
>         at parquet.column.values.dictionary.DictionaryValuesWriter.<init>(DictionaryValuesWriter.java:85)
>         at parquet.column.values.dictionary.DictionaryValuesWriter$PlainIntegerDictionaryValuesWriter.<init>(DictionaryValuesWriter.java:549)
>         at parquet.column.ParquetProperties.getValuesWriter(ParquetProperties.java:88)
>         at parquet.column.impl.ColumnWriterImpl.<init>(ColumnWriterImpl.java:74)
>         at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:68)
>         at parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
>         at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:178)
>         at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
>         at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
>         at parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:94)
>         at parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64)
>         at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
>         at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
>         at org.apache.spark.sql.parquet.InsertIntoParquetTable.org$apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(ParquetTableOperations.scala:304)
>         at org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325)
>         at org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:662)
> {noformat}
> By the way, there is another similar issue https://issues.apache.org/jira/browse/PARQUET-99. But the reporter has closed it and mark it as resolved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)