You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Anil Dasari <ad...@guidewire.com> on 2022/03/02 01:45:10 UTC

Spark Parquet write OOM

Hello everyone,

We are writing Spark Data frame to s3 in parquet and it is failing with below exception.

I wanted to try following to avoid OOM


  1.  increase the default sql shuffle partitions to reduce load on parquet writer tasks to avoid OOM and
  2.  Increase user memory (reduce memory fraction) to have more memory for other data structures assuming parquet writer uses user memory.

I am not sure if these fixes the OOM issue. So wanted to reach out community for any suggestions. Please let me know.

Exception:

org.apache.spark.SparkException: Task failed while writing rows.
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
         at org.apache.spark.scheduler.Task.run(Task.scala:123)
         at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
         at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.OutOfMemoryError
         at sun.misc.Unsafe.allocateMemory(Native Method)
         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
         at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
         at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
         at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
         at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
         at org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
         at org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
         at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
         at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
         at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
         at org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235)
         at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
         at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
         at org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:148)
         at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:130)
         at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
         at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
         at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
         at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
         at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1439)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
         ... 10 more
         Suppressed: java.io.IOException: The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: BLOCK
                 at org.apache.parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:168)
                 at org.apache.parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:160)
                 at org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:291)

Regards,
Anil

Re: {EXT} Re: Spark Parquet write OOM

Posted by Gourav Sengupta <go...@gmail.com>.
Hi Anil,

superb, when I said increase the number of partitions, I was implying
shuffle partitions because you are doing de duplicates by default I think
that should be around 200, which can create issues in case your data volume
is large.

I always prefer to SPARK SQL instead of SPARK dataframes. And the number of
records per file configuration should be mentioned in the following link as
maxrecordsperfile or something like that :
https://spark.apache.org/docs/latest/configuration.html#runtime-sql-configuration
.



Regards,
Gourav Sengupta

On Sat, Mar 5, 2022 at 5:09 PM Anil Dasari <ad...@guidewire.com> wrote:

> I am not sure how to set the records limit. Let me check. I couldn’t find
> parquet row group size configuration in spark.
>
> For now, I increased the number if shuffle partitions to reduce the
> records processed by task to avoid OOM.
>
>
>
> Regards,
>
> Anil
>
>
>
> *From: *Gourav Sengupta <go...@gmail.com>
> *Date: *Saturday, March 5, 2022 at 1:59 AM
> *To: *Anil Dasari <ad...@guidewire.com>
> *Cc: *Yang,Jie(INF) <ya...@baidu.com>, user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi Anil,
>
>
>
> any chance you tried setting the limit on the number of records to be
> written out at a time?
>
>
>
> Regards,
>
> Gourav
>
>
>
> On Thu, Mar 3, 2022 at 3:12 PM Anil Dasari <ad...@guidewire.com> wrote:
>
> Hi Gourav,
>
> Tried increasing shuffle partitions number and higher executor memory.
> Both didn’t work.
>
>
>
> Regards
>
>
>
> *From: *Gourav Sengupta <go...@gmail.com>
> *Date: *Thursday, March 3, 2022 at 2:24 AM
> *To: *Anil Dasari <ad...@guidewire.com>
> *Cc: *Yang,Jie(INF) <ya...@baidu.com>, user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi,
>
>
>
> I do not think that you are doing anything very particularly concerning
> here.
>
>
>
> There is a setting in SPARK which limits the number of records that we can
> write out at a time you can try that. The other thing that you can try is
> to ensure that the number of partitions are more (just like you suggested)
> let me know how things are giong on your end
>
>
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
>
>
> On Thu, Mar 3, 2022 at 8:37 AM Anil Dasari <ad...@guidewire.com> wrote:
>
> Answers in the context. Thanks.
>
>
>
> *From: *Gourav Sengupta <go...@gmail.com>
> *Date: *Thursday, March 3, 2022 at 12:13 AM
> *To: *Anil Dasari <ad...@guidewire.com>
> *Cc: *Yang,Jie(INF) <ya...@baidu.com>, user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi Anil,
>
>
>
> I was trying to work out things for a while yesterday, but may need your
> kind help.
>
>
>
> Can you please share the code for the following steps?
>
> ·
> Create DF from hive (from step #c)
>
> [AD] sparkSession.table(<tablename>)
>
>
>
> ·  Deduplicate spark DF by primary key
>
> [AD] dataFrame.dropDuplicates(<primary key columns>)
>
>
>
> ·  Write DF to s3 in parquet format
>
> [AD] dataFrame.write.mode(saveMode).parquet(path)
>
>
>
> ·  Write metadata to s3
>
> [AD] metadata in json written to s3 using aws sdk
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
> On Wed, Mar 2, 2022 at 11:25 PM Anil Dasari <ad...@guidewire.com> wrote:
>
> 2nd attempt..
>
>
>
> Any suggestions to troubleshoot and fix the problem ? thanks in advance.
>
>
>
> Regards,
>
> Anil
>
>
>
> *From: *Anil Dasari <ad...@guidewire.com>
> *Date: *Wednesday, March 2, 2022 at 7:00 AM
> *To: *Gourav Sengupta <go...@gmail.com>, Yang,Jie(INF) <
> yangjie01@baidu.com>
> *Cc: *user@spark.apache.org <us...@spark.apache.org>
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi Gourav and Yang
>
> Thanks for the response.
>
>
>
> Please find the answers below.
>
>
>
> 1. What is the version of SPARK you are using?
>
> [AD] : Spark 2.4.7 (EMR 5.33.1)
>
>
>
> 2. Are you doing a lot of in-memory transformations like adding columns,
> or running joins, or UDFs thus increasing the size of the data before
> writing out?
>
> [AD] No. Only one new column is added. Our flow is
>
>    1. Read avro data from kafka
>    2. Avro deserialization and add new colum to RDD
>    3. Create spark dataframe (DF) against to latest schema (avro evolved
>    schema) and persist to hive (checkpointing)
>    4. Create DF from hive (from step #c)
>    5. Deduplicate spark DF by primary key
>    6. Write DF to s3 in parquet format
>    7. Write metadata to s3
>
>
>
> The failure is from spark batch job
>
>
>
> 3. Is your pipeline going to change or evolve soon, or the data volumes
> going to vary, or particularly increase, over time?
>
> [AD] : Data volume Is fixed as it is batch job.
>
>
>
> 4. What is the memory that you are having in your executors, and drivers?
>
> [AD] We running one core node and 50 task nodes .. i.e total 51 nodes
> ..each node can create 2 executors (2 core cpu and 8 gb memory)
>
>
>
> 5. Can you show the list of transformations that you are running ?
>
> [AD] No explicit transformations other than basic map transformations
> required to create dataframe from avor record rdd.
>
>
>
> Please let me know if yo have any questions.
>
>
>
> Regards,
>
> Anil
>
>
>
> *From: *Gourav Sengupta <go...@gmail.com>
> *Date: *Wednesday, March 2, 2022 at 1:07 AM
> *To: *Yang,Jie(INF) <ya...@baidu.com>
> *Cc: *Anil Dasari <ad...@guidewire.com>, user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *{EXT} Re: Spark Parquet write OOM
>
> Hi Anil,
>
>
>
> before jumping to the quick symptomatic fix, can we try to understand the
> issues?
>
>
>
> 1. What is the version of SPARK you are using?
>
> 2. Are you doing a lot of in-memory transformations like adding columns,
> or running joins, or UDFs thus increasing the size of the data before
> writing out?
>
> 3. Is your pipeline going to change or evolve soon, or the data volumes
> going to vary, or particularly increase, over time?
>
> 4. What is the memory that you are having in your executors, and drivers?
>
> 5. Can you show the list of transformations that you are running ?
>
>
>
>
>
>
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
>
>
> On Wed, Mar 2, 2022 at 3:18 AM Yang,Jie(INF) <ya...@baidu.com> wrote:
>
> This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase
> the capacity of DirectByteBuffer size by configuring
>  `-XX:MaxDirectMemorySize` and this is a Java opts.
>
>
>
> However, we'd better check the length of memory to be allocated,  because
>  `-XX:MaxDirectMemorySize` and `-Xmx` should have the same capacity by
> default.
>
>
>
>
>
> *发件人**: *Anil Dasari <ad...@guidewire.com>
> *日期**: *2022年3月2日 星期三 09:45
> *收件人**: *"user@spark.apache.org" <us...@spark.apache.org>
> *主题**: *Spark Parquet write OOM
>
>
>
> Hello everyone,
>
>
>
> We are writing Spark Data frame to s3 in parquet and it is failing with
> below exception.
>
>
>
> I wanted to try following to avoid OOM
>
>
>
>    1. increase the default sql shuffle partitions to reduce load on
>    parquet writer tasks to avoid OOM and
>    2. Increase user memory (reduce memory fraction) to have more memory
>    for other data structures assuming parquet writer uses user memory.
>
>
>
> I am not sure if these fixes the OOM issue. So wanted to reach out
> community for any suggestions. Please let me know.
>
>
>
> Exception:
>
>
>
> org.apache.spark.SparkException: Task failed while writing rows.
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
>
>          at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>
>          at org.apache.spark.scheduler.Task.run(Task.scala:123)
>
>          at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>
>          at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
>
>          at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>
>          at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>          at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>          at java.lang.Thread.run(Thread.java:750)
>
> Caused by: java.lang.OutOfMemoryError
>
>          at sun.misc.Unsafe.allocateMemory(Native Method)
>
>          at java.nio.DirectByteBuffer.<init>(
> http://DirectByteBuffer.java:127)
>
>          at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>
>          at
> org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
>
>          at
> org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
>
>          at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
>
>          at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
>
>          at
> org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
>
>          at
> org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
>
>          at
> org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
>
>          at
> org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
>
>          at
> org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
>
>          at
> org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235)
>
>          at
> org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
>
>          at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
>
>          at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:148)
>
>          at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:130)
>
>          at
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
>
>          at
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
>
>          at
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
>
>          at
> org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
>
>          at
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1439)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
>
>          ... 10 more
>
>          Suppressed: java.io.IOException: The file being written is in an
> invalid state. Probably caused by an error thrown previously. Current
> state: BLOCK
>
>                  at
> org.apache.parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:168)
>
>                  at
> org.apache.parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:160)
>
>                  at
> org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:291)
>
>
>
> Regards,
>
> Anil
>
>

Re: {EXT} Re: Spark Parquet write OOM

Posted by Anil Dasari <ad...@guidewire.com>.
I am not sure how to set the records limit. Let me check. I couldn’t find parquet row group size configuration in spark.
For now, I increased the number if shuffle partitions to reduce the records processed by task to avoid OOM.

Regards,
Anil

From: Gourav Sengupta <go...@gmail.com>
Date: Saturday, March 5, 2022 at 1:59 AM
To: Anil Dasari <ad...@guidewire.com>
Cc: Yang,Jie(INF) <ya...@baidu.com>, user@spark.apache.org <us...@spark.apache.org>
Subject: Re: {EXT} Re: Spark Parquet write OOM
Hi Anil,

any chance you tried setting the limit on the number of records to be written out at a time?

Regards,
Gourav

On Thu, Mar 3, 2022 at 3:12 PM Anil Dasari <ad...@guidewire.com>> wrote:
Hi Gourav,
Tried increasing shuffle partitions number and higher executor memory. Both didn’t work.

Regards

From: Gourav Sengupta <go...@gmail.com>>
Date: Thursday, March 3, 2022 at 2:24 AM
To: Anil Dasari <ad...@guidewire.com>>
Cc: Yang,Jie(INF) <ya...@baidu.com>>, user@spark.apache.org<ma...@spark.apache.org> <us...@spark.apache.org>>
Subject: Re: {EXT} Re: Spark Parquet write OOM
Hi,

I do not think that you are doing anything very particularly concerning here.

There is a setting in SPARK which limits the number of records that we can write out at a time you can try that. The other thing that you can try is to ensure that the number of partitions are more (just like you suggested) let me know how things are giong on your end


Regards,
Gourav Sengupta


On Thu, Mar 3, 2022 at 8:37 AM Anil Dasari <ad...@guidewire.com>> wrote:
Answers in the context. Thanks.

From: Gourav Sengupta <go...@gmail.com>>
Date: Thursday, March 3, 2022 at 12:13 AM
To: Anil Dasari <ad...@guidewire.com>>
Cc: Yang,Jie(INF) <ya...@baidu.com>>, user@spark.apache.org<ma...@spark.apache.org> <us...@spark.apache.org>>
Subject: Re: {EXT} Re: Spark Parquet write OOM
Hi Anil,

I was trying to work out things for a while yesterday, but may need your kind help.

Can you please share the code for the following steps?
*
Create DF from hive (from step #c)
[AD] sparkSession.table(<tablename>)

*  Deduplicate spark DF by primary key
[AD] dataFrame.dropDuplicates(<primary key columns>)

*  Write DF to s3 in parquet format
[AD] dataFrame.write.mode(saveMode).parquet(path)

*  Write metadata to s3
[AD] metadata in json written to s3 using aws sdk

Regards,
Gourav Sengupta

On Wed, Mar 2, 2022 at 11:25 PM Anil Dasari <ad...@guidewire.com>> wrote:
2nd attempt..

Any suggestions to troubleshoot and fix the problem ? thanks in advance.

Regards,
Anil

From: Anil Dasari <ad...@guidewire.com>>
Date: Wednesday, March 2, 2022 at 7:00 AM
To: Gourav Sengupta <go...@gmail.com>>, Yang,Jie(INF) <ya...@baidu.com>>
Cc: user@spark.apache.org<ma...@spark.apache.org> <us...@spark.apache.org>>
Subject: Re: {EXT} Re: Spark Parquet write OOM
Hi Gourav and Yang
Thanks for the response.

Please find the answers below.

1. What is the version of SPARK you are using?
[AD] : Spark 2.4.7 (EMR 5.33.1)

2. Are you doing a lot of in-memory transformations like adding columns, or running joins, or UDFs thus increasing the size of the data before writing out?
[AD] No. Only one new column is added. Our flow is

  1.  Read avro data from kafka
  2.  Avro deserialization and add new colum to RDD
  3.  Create spark dataframe (DF) against to latest schema (avro evolved schema) and persist to hive (checkpointing)
  4.  Create DF from hive (from step #c)
  5.  Deduplicate spark DF by primary key
  6.  Write DF to s3 in parquet format
  7.  Write metadata to s3

The failure is from spark batch job

3. Is your pipeline going to change or evolve soon, or the data volumes going to vary, or particularly increase, over time?
[AD] : Data volume Is fixed as it is batch job.

4. What is the memory that you are having in your executors, and drivers?
[AD] We running one core node and 50 task nodes .. i.e total 51 nodes ..each node can create 2 executors (2 core cpu and 8 gb memory)

5. Can you show the list of transformations that you are running ?
[AD] No explicit transformations other than basic map transformations required to create dataframe from avor record rdd.

Please let me know if yo have any questions.

Regards,
Anil

From: Gourav Sengupta <go...@gmail.com>>
Date: Wednesday, March 2, 2022 at 1:07 AM
To: Yang,Jie(INF) <ya...@baidu.com>>
Cc: Anil Dasari <ad...@guidewire.com>>, user@spark.apache.org<ma...@spark.apache.org> <us...@spark.apache.org>>
Subject: {EXT} Re: Spark Parquet write OOM
Hi Anil,

before jumping to the quick symptomatic fix, can we try to understand the issues?

1. What is the version of SPARK you are using?
2. Are you doing a lot of in-memory transformations like adding columns, or running joins, or UDFs thus increasing the size of the data before writing out?
3. Is your pipeline going to change or evolve soon, or the data volumes going to vary, or particularly increase, over time?
4. What is the memory that you are having in your executors, and drivers?
5. Can you show the list of transformations that you are running ?




Regards,
Gourav Sengupta


On Wed, Mar 2, 2022 at 3:18 AM Yang,Jie(INF) <ya...@baidu.com>> wrote:
This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase the capacity of DirectByteBuffer size by configuring  `-XX:MaxDirectMemorySize` and this is a Java opts.

However, we'd better check the length of memory to be allocated,  because  `-XX:MaxDirectMemorySize` and `-Xmx` should have the same capacity by default.


发件人: Anil Dasari <ad...@guidewire.com>>
日期: 2022年3月2日 星期三 09:45
收件人: "user@spark.apache.org<ma...@spark.apache.org>" <us...@spark.apache.org>>
主题: Spark Parquet write OOM

Hello everyone,

We are writing Spark Data frame to s3 in parquet and it is failing with below exception.

I wanted to try following to avoid OOM


  1.  increase the default sql shuffle partitions to reduce load on parquet writer tasks to avoid OOM and
  2.  Increase user memory (reduce memory fraction) to have more memory for other data structures assuming parquet writer uses user memory.

I am not sure if these fixes the OOM issue. So wanted to reach out community for any suggestions. Please let me know.

Exception:

org.apache.spark.SparkException: Task failed while writing rows.
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
         at org.apache.spark.scheduler.Task.run(Task.scala:123)
         at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
         at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.OutOfMemoryError
         at sun.misc.Unsafe.allocateMemory(Native Method)
         at java.nio.DirectByteBuffer.<init>(http://DirectByteBuffer.java:127<http://DirectByteBuffer.java:127>)
         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
         at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
         at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
         at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
         at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
         at org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
         at org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
         at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
         at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
         at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
         at org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235)
         at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
         at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
         at org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:148)
         at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:130)
         at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
         at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
         at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
         at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
         at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1439)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
         ... 10 more
         Suppressed: java.io.IOException: The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: BLOCK
                 at org.apache.parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:168)
                 at org.apache.parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:160)
                 at org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:291)

Regards,
Anil

Re: {EXT} Re: Spark Parquet write OOM

Posted by Gourav Sengupta <go...@gmail.com>.
Hi Anil,

any chance you tried setting the limit on the number of records to be
written out at a time?

Regards,
Gourav

On Thu, Mar 3, 2022 at 3:12 PM Anil Dasari <ad...@guidewire.com> wrote:

> Hi Gourav,
>
> Tried increasing shuffle partitions number and higher executor memory.
> Both didn’t work.
>
>
>
> Regards
>
>
>
> *From: *Gourav Sengupta <go...@gmail.com>
> *Date: *Thursday, March 3, 2022 at 2:24 AM
> *To: *Anil Dasari <ad...@guidewire.com>
> *Cc: *Yang,Jie(INF) <ya...@baidu.com>, user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi,
>
>
>
> I do not think that you are doing anything very particularly concerning
> here.
>
>
>
> There is a setting in SPARK which limits the number of records that we can
> write out at a time you can try that. The other thing that you can try is
> to ensure that the number of partitions are more (just like you suggested)
> let me know how things are giong on your end
>
>
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
>
>
> On Thu, Mar 3, 2022 at 8:37 AM Anil Dasari <ad...@guidewire.com> wrote:
>
> Answers in the context. Thanks.
>
>
>
> *From: *Gourav Sengupta <go...@gmail.com>
> *Date: *Thursday, March 3, 2022 at 12:13 AM
> *To: *Anil Dasari <ad...@guidewire.com>
> *Cc: *Yang,Jie(INF) <ya...@baidu.com>, user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi Anil,
>
>
>
> I was trying to work out things for a while yesterday, but may need your
> kind help.
>
>
>
> Can you please share the code for the following steps?
>
> ·
> Create DF from hive (from step #c)
>
> [AD] sparkSession.table(<tablename>)
>
>
>
> ·  Deduplicate spark DF by primary key
>
> [AD] dataFrame.dropDuplicates(<primary key columns>)
>
>
>
> ·  Write DF to s3 in parquet format
>
> [AD] dataFrame.write.mode(saveMode).parquet(path)
>
>
>
> ·  Write metadata to s3
>
> [AD] metadata in json written to s3 using aws sdk
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
> On Wed, Mar 2, 2022 at 11:25 PM Anil Dasari <ad...@guidewire.com> wrote:
>
> 2nd attempt..
>
>
>
> Any suggestions to troubleshoot and fix the problem ? thanks in advance.
>
>
>
> Regards,
>
> Anil
>
>
>
> *From: *Anil Dasari <ad...@guidewire.com>
> *Date: *Wednesday, March 2, 2022 at 7:00 AM
> *To: *Gourav Sengupta <go...@gmail.com>, Yang,Jie(INF) <
> yangjie01@baidu.com>
> *Cc: *user@spark.apache.org <us...@spark.apache.org>
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi Gourav and Yang
>
> Thanks for the response.
>
>
>
> Please find the answers below.
>
>
>
> 1. What is the version of SPARK you are using?
>
> [AD] : Spark 2.4.7 (EMR 5.33.1)
>
>
>
> 2. Are you doing a lot of in-memory transformations like adding columns,
> or running joins, or UDFs thus increasing the size of the data before
> writing out?
>
> [AD] No. Only one new column is added. Our flow is
>
>    1. Read avro data from kafka
>    2. Avro deserialization and add new colum to RDD
>    3. Create spark dataframe (DF) against to latest schema (avro evolved
>    schema) and persist to hive (checkpointing)
>    4. Create DF from hive (from step #c)
>    5. Deduplicate spark DF by primary key
>    6. Write DF to s3 in parquet format
>    7. Write metadata to s3
>
>
>
> The failure is from spark batch job
>
>
>
> 3. Is your pipeline going to change or evolve soon, or the data volumes
> going to vary, or particularly increase, over time?
>
> [AD] : Data volume Is fixed as it is batch job.
>
>
>
> 4. What is the memory that you are having in your executors, and drivers?
>
> [AD] We running one core node and 50 task nodes .. i.e total 51 nodes
> ..each node can create 2 executors (2 core cpu and 8 gb memory)
>
>
>
> 5. Can you show the list of transformations that you are running ?
>
> [AD] No explicit transformations other than basic map transformations
> required to create dataframe from avor record rdd.
>
>
>
> Please let me know if yo have any questions.
>
>
>
> Regards,
>
> Anil
>
>
>
> *From: *Gourav Sengupta <go...@gmail.com>
> *Date: *Wednesday, March 2, 2022 at 1:07 AM
> *To: *Yang,Jie(INF) <ya...@baidu.com>
> *Cc: *Anil Dasari <ad...@guidewire.com>, user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *{EXT} Re: Spark Parquet write OOM
>
> Hi Anil,
>
>
>
> before jumping to the quick symptomatic fix, can we try to understand the
> issues?
>
>
>
> 1. What is the version of SPARK you are using?
>
> 2. Are you doing a lot of in-memory transformations like adding columns,
> or running joins, or UDFs thus increasing the size of the data before
> writing out?
>
> 3. Is your pipeline going to change or evolve soon, or the data volumes
> going to vary, or particularly increase, over time?
>
> 4. What is the memory that you are having in your executors, and drivers?
>
> 5. Can you show the list of transformations that you are running ?
>
>
>
>
>
>
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
>
>
> On Wed, Mar 2, 2022 at 3:18 AM Yang,Jie(INF) <ya...@baidu.com> wrote:
>
> This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase
> the capacity of DirectByteBuffer size by configuring
>  `-XX:MaxDirectMemorySize` and this is a Java opts.
>
>
>
> However, we'd better check the length of memory to be allocated,  because
>  `-XX:MaxDirectMemorySize` and `-Xmx` should have the same capacity by
> default.
>
>
>
>
>
> *发件人**: *Anil Dasari <ad...@guidewire.com>
> *日期**: *2022年3月2日 星期三 09:45
> *收件人**: *"user@spark.apache.org" <us...@spark.apache.org>
> *主题**: *Spark Parquet write OOM
>
>
>
> Hello everyone,
>
>
>
> We are writing Spark Data frame to s3 in parquet and it is failing with
> below exception.
>
>
>
> I wanted to try following to avoid OOM
>
>
>
>    1. increase the default sql shuffle partitions to reduce load on
>    parquet writer tasks to avoid OOM and
>    2. Increase user memory (reduce memory fraction) to have more memory
>    for other data structures assuming parquet writer uses user memory.
>
>
>
> I am not sure if these fixes the OOM issue. So wanted to reach out
> community for any suggestions. Please let me know.
>
>
>
> Exception:
>
>
>
> org.apache.spark.SparkException: Task failed while writing rows.
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
>
>          at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>
>          at org.apache.spark.scheduler.Task.run(Task.scala:123)
>
>          at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>
>          at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
>
>          at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>
>          at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>          at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>          at java.lang.Thread.run(Thread.java:750)
>
> Caused by: java.lang.OutOfMemoryError
>
>          at sun.misc.Unsafe.allocateMemory(Native Method)
>
>          at java.nio.DirectByteBuffer.<init>(
> http://DirectByteBuffer.java:127)
>
>          at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>
>          at
> org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
>
>          at
> org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
>
>          at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
>
>          at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
>
>          at
> org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
>
>          at
> org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
>
>          at
> org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
>
>          at
> org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
>
>          at
> org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
>
>          at
> org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235)
>
>          at
> org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
>
>          at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
>
>          at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:148)
>
>          at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:130)
>
>          at
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
>
>          at
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
>
>          at
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
>
>          at
> org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
>
>          at
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1439)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
>
>          ... 10 more
>
>          Suppressed: java.io.IOException: The file being written is in an
> invalid state. Probably caused by an error thrown previously. Current
> state: BLOCK
>
>                  at
> org.apache.parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:168)
>
>                  at
> org.apache.parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:160)
>
>                  at
> org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:291)
>
>
>
> Regards,
>
> Anil
>
>

Re: {EXT} Re: Spark Parquet write OOM

Posted by Anil Dasari <ad...@guidewire.com>.
Hi Gourav,
Tried increasing shuffle partitions number and higher executor memory. Both didn’t work.

Regards

From: Gourav Sengupta <go...@gmail.com>
Date: Thursday, March 3, 2022 at 2:24 AM
To: Anil Dasari <ad...@guidewire.com>
Cc: Yang,Jie(INF) <ya...@baidu.com>, user@spark.apache.org <us...@spark.apache.org>
Subject: Re: {EXT} Re: Spark Parquet write OOM
Hi,

I do not think that you are doing anything very particularly concerning here.

There is a setting in SPARK which limits the number of records that we can write out at a time you can try that. The other thing that you can try is to ensure that the number of partitions are more (just like you suggested) let me know how things are giong on your end


Regards,
Gourav Sengupta


On Thu, Mar 3, 2022 at 8:37 AM Anil Dasari <ad...@guidewire.com>> wrote:
Answers in the context. Thanks.

From: Gourav Sengupta <go...@gmail.com>>
Date: Thursday, March 3, 2022 at 12:13 AM
To: Anil Dasari <ad...@guidewire.com>>
Cc: Yang,Jie(INF) <ya...@baidu.com>>, user@spark.apache.org<ma...@spark.apache.org> <us...@spark.apache.org>>
Subject: Re: {EXT} Re: Spark Parquet write OOM
Hi Anil,

I was trying to work out things for a while yesterday, but may need your kind help.

Can you please share the code for the following steps?
*
Create DF from hive (from step #c)
[AD] sparkSession.table(<tablename>)

*  Deduplicate spark DF by primary key
[AD] dataFrame.dropDuplicates(<primary key columns>)

*  Write DF to s3 in parquet format
[AD] dataFrame.write.mode(saveMode).parquet(path)

*  Write metadata to s3
[AD] metadata in json written to s3 using aws sdk

Regards,
Gourav Sengupta

On Wed, Mar 2, 2022 at 11:25 PM Anil Dasari <ad...@guidewire.com>> wrote:
2nd attempt..

Any suggestions to troubleshoot and fix the problem ? thanks in advance.

Regards,
Anil

From: Anil Dasari <ad...@guidewire.com>>
Date: Wednesday, March 2, 2022 at 7:00 AM
To: Gourav Sengupta <go...@gmail.com>>, Yang,Jie(INF) <ya...@baidu.com>>
Cc: user@spark.apache.org<ma...@spark.apache.org> <us...@spark.apache.org>>
Subject: Re: {EXT} Re: Spark Parquet write OOM
Hi Gourav and Yang
Thanks for the response.

Please find the answers below.

1. What is the version of SPARK you are using?
[AD] : Spark 2.4.7 (EMR 5.33.1)

2. Are you doing a lot of in-memory transformations like adding columns, or running joins, or UDFs thus increasing the size of the data before writing out?
[AD] No. Only one new column is added. Our flow is

  1.  Read avro data from kafka
  2.  Avro deserialization and add new colum to RDD
  3.  Create spark dataframe (DF) against to latest schema (avro evolved schema) and persist to hive (checkpointing)
  4.  Create DF from hive (from step #c)
  5.  Deduplicate spark DF by primary key
  6.  Write DF to s3 in parquet format
  7.  Write metadata to s3

The failure is from spark batch job

3. Is your pipeline going to change or evolve soon, or the data volumes going to vary, or particularly increase, over time?
[AD] : Data volume Is fixed as it is batch job.

4. What is the memory that you are having in your executors, and drivers?
[AD] We running one core node and 50 task nodes .. i.e total 51 nodes ..each node can create 2 executors (2 core cpu and 8 gb memory)

5. Can you show the list of transformations that you are running ?
[AD] No explicit transformations other than basic map transformations required to create dataframe from avor record rdd.

Please let me know if yo have any questions.

Regards,
Anil

From: Gourav Sengupta <go...@gmail.com>>
Date: Wednesday, March 2, 2022 at 1:07 AM
To: Yang,Jie(INF) <ya...@baidu.com>>
Cc: Anil Dasari <ad...@guidewire.com>>, user@spark.apache.org<ma...@spark.apache.org> <us...@spark.apache.org>>
Subject: {EXT} Re: Spark Parquet write OOM
Hi Anil,

before jumping to the quick symptomatic fix, can we try to understand the issues?

1. What is the version of SPARK you are using?
2. Are you doing a lot of in-memory transformations like adding columns, or running joins, or UDFs thus increasing the size of the data before writing out?
3. Is your pipeline going to change or evolve soon, or the data volumes going to vary, or particularly increase, over time?
4. What is the memory that you are having in your executors, and drivers?
5. Can you show the list of transformations that you are running ?




Regards,
Gourav Sengupta


On Wed, Mar 2, 2022 at 3:18 AM Yang,Jie(INF) <ya...@baidu.com>> wrote:
This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase the capacity of DirectByteBuffer size by configuring  `-XX:MaxDirectMemorySize` and this is a Java opts.

However, we'd better check the length of memory to be allocated,  because  `-XX:MaxDirectMemorySize` and `-Xmx` should have the same capacity by default.


发件人: Anil Dasari <ad...@guidewire.com>>
日期: 2022年3月2日 星期三 09:45
收件人: "user@spark.apache.org<ma...@spark.apache.org>" <us...@spark.apache.org>>
主题: Spark Parquet write OOM

Hello everyone,

We are writing Spark Data frame to s3 in parquet and it is failing with below exception.

I wanted to try following to avoid OOM


  1.  increase the default sql shuffle partitions to reduce load on parquet writer tasks to avoid OOM and
  2.  Increase user memory (reduce memory fraction) to have more memory for other data structures assuming parquet writer uses user memory.

I am not sure if these fixes the OOM issue. So wanted to reach out community for any suggestions. Please let me know.

Exception:

org.apache.spark.SparkException: Task failed while writing rows.
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
         at org.apache.spark.scheduler.Task.run(Task.scala:123)
         at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
         at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.OutOfMemoryError
         at sun.misc.Unsafe.allocateMemory(Native Method)
         at java.nio.DirectByteBuffer.<init>(http://DirectByteBuffer.java:127<http://DirectByteBuffer.java:127>)
         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
         at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
         at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
         at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
         at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
         at org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
         at org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
         at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
         at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
         at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
         at org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235)
         at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
         at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
         at org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:148)
         at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:130)
         at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
         at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
         at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
         at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
         at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1439)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
         ... 10 more
         Suppressed: java.io.IOException: The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: BLOCK
                 at org.apache.parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:168)
                 at org.apache.parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:160)
                 at org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:291)

Regards,
Anil

Re: {EXT} Re: Spark Parquet write OOM

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

I do not think that you are doing anything very particularly concerning
here.

There is a setting in SPARK which limits the number of records that we can
write out at a time you can try that. The other thing that you can try is
to ensure that the number of partitions are more (just like you suggested)
let me know how things are giong on your end


Regards,
Gourav Sengupta


On Thu, Mar 3, 2022 at 8:37 AM Anil Dasari <ad...@guidewire.com> wrote:

> Answers in the context. Thanks.
>
>
>
> *From: *Gourav Sengupta <go...@gmail.com>
> *Date: *Thursday, March 3, 2022 at 12:13 AM
> *To: *Anil Dasari <ad...@guidewire.com>
> *Cc: *Yang,Jie(INF) <ya...@baidu.com>, user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi Anil,
>
>
>
> I was trying to work out things for a while yesterday, but may need your
> kind help.
>
>
>
> Can you please share the code for the following steps?
>
> ·
> Create DF from hive (from step #c)
>
> [AD] sparkSession.table(<tablename>)
>
>
>
> ·  Deduplicate spark DF by primary key
>
> [AD] dataFrame.dropDuplicates(<primary key columns>)
>
>
>
> ·  Write DF to s3 in parquet format
>
> [AD] dataFrame.write.mode(saveMode).parquet(path)
>
>
>
> ·  Write metadata to s3
>
> [AD] metadata in json written to s3 using aws sdk
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
> On Wed, Mar 2, 2022 at 11:25 PM Anil Dasari <ad...@guidewire.com> wrote:
>
> 2nd attempt..
>
>
>
> Any suggestions to troubleshoot and fix the problem ? thanks in advance.
>
>
>
> Regards,
>
> Anil
>
>
>
> *From: *Anil Dasari <ad...@guidewire.com>
> *Date: *Wednesday, March 2, 2022 at 7:00 AM
> *To: *Gourav Sengupta <go...@gmail.com>, Yang,Jie(INF) <
> yangjie01@baidu.com>
> *Cc: *user@spark.apache.org <us...@spark.apache.org>
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi Gourav and Yang
>
> Thanks for the response.
>
>
>
> Please find the answers below.
>
>
>
> 1. What is the version of SPARK you are using?
>
> [AD] : Spark 2.4.7 (EMR 5.33.1)
>
>
>
> 2. Are you doing a lot of in-memory transformations like adding columns,
> or running joins, or UDFs thus increasing the size of the data before
> writing out?
>
> [AD] No. Only one new column is added. Our flow is
>
>    1. Read avro data from kafka
>    2. Avro deserialization and add new colum to RDD
>    3. Create spark dataframe (DF) against to latest schema (avro evolved
>    schema) and persist to hive (checkpointing)
>    4. Create DF from hive (from step #c)
>    5. Deduplicate spark DF by primary key
>    6. Write DF to s3 in parquet format
>    7. Write metadata to s3
>
>
>
> The failure is from spark batch job
>
>
>
> 3. Is your pipeline going to change or evolve soon, or the data volumes
> going to vary, or particularly increase, over time?
>
> [AD] : Data volume Is fixed as it is batch job.
>
>
>
> 4. What is the memory that you are having in your executors, and drivers?
>
> [AD] We running one core node and 50 task nodes .. i.e total 51 nodes
> ..each node can create 2 executors (2 core cpu and 8 gb memory)
>
>
>
> 5. Can you show the list of transformations that you are running ?
>
> [AD] No explicit transformations other than basic map transformations
> required to create dataframe from avor record rdd.
>
>
>
> Please let me know if yo have any questions.
>
>
>
> Regards,
>
> Anil
>
>
>
> *From: *Gourav Sengupta <go...@gmail.com>
> *Date: *Wednesday, March 2, 2022 at 1:07 AM
> *To: *Yang,Jie(INF) <ya...@baidu.com>
> *Cc: *Anil Dasari <ad...@guidewire.com>, user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *{EXT} Re: Spark Parquet write OOM
>
> Hi Anil,
>
>
>
> before jumping to the quick symptomatic fix, can we try to understand the
> issues?
>
>
>
> 1. What is the version of SPARK you are using?
>
> 2. Are you doing a lot of in-memory transformations like adding columns,
> or running joins, or UDFs thus increasing the size of the data before
> writing out?
>
> 3. Is your pipeline going to change or evolve soon, or the data volumes
> going to vary, or particularly increase, over time?
>
> 4. What is the memory that you are having in your executors, and drivers?
>
> 5. Can you show the list of transformations that you are running ?
>
>
>
>
>
>
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
>
>
> On Wed, Mar 2, 2022 at 3:18 AM Yang,Jie(INF) <ya...@baidu.com> wrote:
>
> This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase
> the capacity of DirectByteBuffer size by configuring
>  `-XX:MaxDirectMemorySize` and this is a Java opts.
>
>
>
> However, we'd better check the length of memory to be allocated,  because
>  `-XX:MaxDirectMemorySize` and `-Xmx` should have the same capacity by
> default.
>
>
>
>
>
> *发件人**: *Anil Dasari <ad...@guidewire.com>
> *日期**: *2022年3月2日 星期三 09:45
> *收件人**: *"user@spark.apache.org" <us...@spark.apache.org>
> *主题**: *Spark Parquet write OOM
>
>
>
> Hello everyone,
>
>
>
> We are writing Spark Data frame to s3 in parquet and it is failing with
> below exception.
>
>
>
> I wanted to try following to avoid OOM
>
>
>
>    1. increase the default sql shuffle partitions to reduce load on
>    parquet writer tasks to avoid OOM and
>    2. Increase user memory (reduce memory fraction) to have more memory
>    for other data structures assuming parquet writer uses user memory.
>
>
>
> I am not sure if these fixes the OOM issue. So wanted to reach out
> community for any suggestions. Please let me know.
>
>
>
> Exception:
>
>
>
> org.apache.spark.SparkException: Task failed while writing rows.
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
>
>          at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>
>          at org.apache.spark.scheduler.Task.run(Task.scala:123)
>
>          at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>
>          at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
>
>          at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>
>          at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>          at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>          at java.lang.Thread.run(Thread.java:750)
>
> Caused by: java.lang.OutOfMemoryError
>
>          at sun.misc.Unsafe.allocateMemory(Native Method)
>
>          at java.nio.DirectByteBuffer.<init>(
> http://DirectByteBuffer.java:127)
>
>          at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>
>          at
> org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
>
>          at
> org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
>
>          at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
>
>          at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
>
>          at
> org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
>
>          at
> org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
>
>          at
> org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
>
>          at
> org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
>
>          at
> org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
>
>          at
> org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235)
>
>          at
> org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
>
>          at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
>
>          at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:148)
>
>          at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:130)
>
>          at
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
>
>          at
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
>
>          at
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
>
>          at
> org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
>
>          at
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1439)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
>
>          ... 10 more
>
>          Suppressed: java.io.IOException: The file being written is in an
> invalid state. Probably caused by an error thrown previously. Current
> state: BLOCK
>
>                  at
> org.apache.parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:168)
>
>                  at
> org.apache.parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:160)
>
>                  at
> org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:291)
>
>
>
> Regards,
>
> Anil
>
>

Re: {EXT} Re: Spark Parquet write OOM

Posted by Anil Dasari <ad...@guidewire.com>.
Answers in the context. Thanks.

From: Gourav Sengupta <go...@gmail.com>
Date: Thursday, March 3, 2022 at 12:13 AM
To: Anil Dasari <ad...@guidewire.com>
Cc: Yang,Jie(INF) <ya...@baidu.com>, user@spark.apache.org <us...@spark.apache.org>
Subject: Re: {EXT} Re: Spark Parquet write OOM
Hi Anil,

I was trying to work out things for a while yesterday, but may need your kind help.

Can you please share the code for the following steps?
*
Create DF from hive (from step #c)
[AD] sparkSession.table(<tablename>)

*  Deduplicate spark DF by primary key
[AD] dataFrame.dropDuplicates(<primary key columns>)

*  Write DF to s3 in parquet format
[AD] dataFrame.write.mode(saveMode).parquet(path)

*  Write metadata to s3
[AD] metadata in json written to s3 using aws sdk

Regards,
Gourav Sengupta

On Wed, Mar 2, 2022 at 11:25 PM Anil Dasari <ad...@guidewire.com>> wrote:
2nd attempt..

Any suggestions to troubleshoot and fix the problem ? thanks in advance.

Regards,
Anil

From: Anil Dasari <ad...@guidewire.com>>
Date: Wednesday, March 2, 2022 at 7:00 AM
To: Gourav Sengupta <go...@gmail.com>>, Yang,Jie(INF) <ya...@baidu.com>>
Cc: user@spark.apache.org<ma...@spark.apache.org> <us...@spark.apache.org>>
Subject: Re: {EXT} Re: Spark Parquet write OOM
Hi Gourav and Yang
Thanks for the response.

Please find the answers below.

1. What is the version of SPARK you are using?
[AD] : Spark 2.4.7 (EMR 5.33.1)

2. Are you doing a lot of in-memory transformations like adding columns, or running joins, or UDFs thus increasing the size of the data before writing out?
[AD] No. Only one new column is added. Our flow is

  1.  Read avro data from kafka
  2.  Avro deserialization and add new colum to RDD
  3.  Create spark dataframe (DF) against to latest schema (avro evolved schema) and persist to hive (checkpointing)
  4.  Create DF from hive (from step #c)
  5.  Deduplicate spark DF by primary key
  6.  Write DF to s3 in parquet format
  7.  Write metadata to s3

The failure is from spark batch job

3. Is your pipeline going to change or evolve soon, or the data volumes going to vary, or particularly increase, over time?
[AD] : Data volume Is fixed as it is batch job.

4. What is the memory that you are having in your executors, and drivers?
[AD] We running one core node and 50 task nodes .. i.e total 51 nodes ..each node can create 2 executors (2 core cpu and 8 gb memory)

5. Can you show the list of transformations that you are running ?
[AD] No explicit transformations other than basic map transformations required to create dataframe from avor record rdd.

Please let me know if yo have any questions.

Regards,
Anil

From: Gourav Sengupta <go...@gmail.com>>
Date: Wednesday, March 2, 2022 at 1:07 AM
To: Yang,Jie(INF) <ya...@baidu.com>>
Cc: Anil Dasari <ad...@guidewire.com>>, user@spark.apache.org<ma...@spark.apache.org> <us...@spark.apache.org>>
Subject: {EXT} Re: Spark Parquet write OOM
Hi Anil,

before jumping to the quick symptomatic fix, can we try to understand the issues?

1. What is the version of SPARK you are using?
2. Are you doing a lot of in-memory transformations like adding columns, or running joins, or UDFs thus increasing the size of the data before writing out?
3. Is your pipeline going to change or evolve soon, or the data volumes going to vary, or particularly increase, over time?
4. What is the memory that you are having in your executors, and drivers?
5. Can you show the list of transformations that you are running ?




Regards,
Gourav Sengupta


On Wed, Mar 2, 2022 at 3:18 AM Yang,Jie(INF) <ya...@baidu.com>> wrote:
This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase the capacity of DirectByteBuffer size by configuring  `-XX:MaxDirectMemorySize` and this is a Java opts.

However, we'd better check the length of memory to be allocated,  because  `-XX:MaxDirectMemorySize` and `-Xmx` should have the same capacity by default.


发件人: Anil Dasari <ad...@guidewire.com>>
日期: 2022年3月2日 星期三 09:45
收件人: "user@spark.apache.org<ma...@spark.apache.org>" <us...@spark.apache.org>>
主题: Spark Parquet write OOM

Hello everyone,

We are writing Spark Data frame to s3 in parquet and it is failing with below exception.

I wanted to try following to avoid OOM


  1.  increase the default sql shuffle partitions to reduce load on parquet writer tasks to avoid OOM and
  2.  Increase user memory (reduce memory fraction) to have more memory for other data structures assuming parquet writer uses user memory.

I am not sure if these fixes the OOM issue. So wanted to reach out community for any suggestions. Please let me know.

Exception:

org.apache.spark.SparkException: Task failed while writing rows.
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
         at org.apache.spark.scheduler.Task.run(Task.scala:123)
         at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
         at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.OutOfMemoryError
         at sun.misc.Unsafe.allocateMemory(Native Method)
         at java.nio.DirectByteBuffer.<init>(http://DirectByteBuffer.java:127<http://DirectByteBuffer.java:127>)
         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
         at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
         at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
         at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
         at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
         at org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
         at org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
         at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
         at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
         at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
         at org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235)
         at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
         at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
         at org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:148)
         at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:130)
         at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
         at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
         at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
         at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
         at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1439)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
         ... 10 more
         Suppressed: java.io.IOException: The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: BLOCK
                 at org.apache.parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:168)
                 at org.apache.parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:160)
                 at org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:291)

Regards,
Anil

Re: {EXT} Re: Spark Parquet write OOM

Posted by Gourav Sengupta <go...@gmail.com>.
Hi Anil,

I was trying to work out things for a while yesterday, but may need your
kind help.

Can you please share the code for the following steps?
-
Create DF from hive (from step #c)
- Deduplicate spark DF by primary key
- Write DF to s3 in parquet format
- Write metadata to s3
Regards,
Gourav Sengupta

On Wed, Mar 2, 2022 at 11:25 PM Anil Dasari <ad...@guidewire.com> wrote:

> 2nd attempt..
>
>
>
> Any suggestions to troubleshoot and fix the problem ? thanks in advance.
>
>
>
> Regards,
>
> Anil
>
>
>
> *From: *Anil Dasari <ad...@guidewire.com>
> *Date: *Wednesday, March 2, 2022 at 7:00 AM
> *To: *Gourav Sengupta <go...@gmail.com>, Yang,Jie(INF) <
> yangjie01@baidu.com>
> *Cc: *user@spark.apache.org <us...@spark.apache.org>
> *Subject: *Re: {EXT} Re: Spark Parquet write OOM
>
> Hi Gourav and Yang
>
> Thanks for the response.
>
>
>
> Please find the answers below.
>
>
>
> 1. What is the version of SPARK you are using?
>
> [AD] : Spark 2.4.7 (EMR 5.33.1)
>
>
>
> 2. Are you doing a lot of in-memory transformations like adding columns,
> or running joins, or UDFs thus increasing the size of the data before
> writing out?
>
> [AD] No. Only one new column is added. Our flow is
>
>    1. Read avro data from kafka
>    2. Avro deserialization and add new colum to RDD
>    3. Create spark dataframe (DF) against to latest schema (avro evolved
>    schema) and persist to hive (checkpointing)
>    4. Create DF from hive (from step #c)
>    5. Deduplicate spark DF by primary key
>    6. Write DF to s3 in parquet format
>    7. Write metadata to s3
>
>
>
> The failure is from spark batch job
>
>
>
> 3. Is your pipeline going to change or evolve soon, or the data volumes
> going to vary, or particularly increase, over time?
>
> [AD] : Data volume Is fixed as it is batch job.
>
>
>
> 4. What is the memory that you are having in your executors, and drivers?
>
> [AD] We running one core node and 50 task nodes .. i.e total 51 nodes
> ..each node can create 2 executors (2 core cpu and 8 gb memory)
>
>
>
> 5. Can you show the list of transformations that you are running ?
>
> [AD] No explicit transformations other than basic map transformations
> required to create dataframe from avor record rdd.
>
>
>
> Please let me know if yo have any questions.
>
>
>
> Regards,
>
> Anil
>
>
>
> *From: *Gourav Sengupta <go...@gmail.com>
> *Date: *Wednesday, March 2, 2022 at 1:07 AM
> *To: *Yang,Jie(INF) <ya...@baidu.com>
> *Cc: *Anil Dasari <ad...@guidewire.com>, user@spark.apache.org <
> user@spark.apache.org>
> *Subject: *{EXT} Re: Spark Parquet write OOM
>
> Hi Anil,
>
>
>
> before jumping to the quick symptomatic fix, can we try to understand the
> issues?
>
>
>
> 1. What is the version of SPARK you are using?
>
> 2. Are you doing a lot of in-memory transformations like adding columns,
> or running joins, or UDFs thus increasing the size of the data before
> writing out?
>
> 3. Is your pipeline going to change or evolve soon, or the data volumes
> going to vary, or particularly increase, over time?
>
> 4. What is the memory that you are having in your executors, and drivers?
>
> 5. Can you show the list of transformations that you are running ?
>
>
>
>
>
>
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
>
>
> On Wed, Mar 2, 2022 at 3:18 AM Yang,Jie(INF) <ya...@baidu.com> wrote:
>
> This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase
> the capacity of DirectByteBuffer size by configuring
>  `-XX:MaxDirectMemorySize` and this is a Java opts.
>
>
>
> However, we'd better check the length of memory to be allocated,  because
>  `-XX:MaxDirectMemorySize` and `-Xmx` should have the same capacity by
> default.
>
>
>
>
>
> *发件人**: *Anil Dasari <ad...@guidewire.com>
> *日期**: *2022年3月2日 星期三 09:45
> *收件人**: *"user@spark.apache.org" <us...@spark.apache.org>
> *主题**: *Spark Parquet write OOM
>
>
>
> Hello everyone,
>
>
>
> We are writing Spark Data frame to s3 in parquet and it is failing with
> below exception.
>
>
>
> I wanted to try following to avoid OOM
>
>
>
>    1. increase the default sql shuffle partitions to reduce load on
>    parquet writer tasks to avoid OOM and
>    2. Increase user memory (reduce memory fraction) to have more memory
>    for other data structures assuming parquet writer uses user memory.
>
>
>
> I am not sure if these fixes the OOM issue. So wanted to reach out
> community for any suggestions. Please let me know.
>
>
>
> Exception:
>
>
>
> org.apache.spark.SparkException: Task failed while writing rows.
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
>
>          at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>
>          at org.apache.spark.scheduler.Task.run(Task.scala:123)
>
>          at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>
>          at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
>
>          at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>
>          at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>          at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>          at java.lang.Thread.run(Thread.java:750)
>
> Caused by: java.lang.OutOfMemoryError
>
>          at sun.misc.Unsafe.allocateMemory(Native Method)
>
>          at java.nio.DirectByteBuffer.<init>(
> http://DirectByteBuffer.java:127)
>
>          at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>
>          at
> org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
>
>          at
> org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
>
>          at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
>
>          at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
>
>          at
> org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
>
>          at
> org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
>
>          at
> org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
>
>          at
> org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
>
>          at
> org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
>
>          at
> org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235)
>
>          at
> org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
>
>          at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
>
>          at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:148)
>
>          at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:130)
>
>          at
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
>
>          at
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
>
>          at
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
>
>          at
> org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
>
>          at
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1439)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
>
>          ... 10 more
>
>          Suppressed: java.io.IOException: The file being written is in an
> invalid state. Probably caused by an error thrown previously. Current
> state: BLOCK
>
>                  at
> org.apache.parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:168)
>
>                  at
> org.apache.parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:160)
>
>                  at
> org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:291)
>
>
>
> Regards,
>
> Anil
>
>

Re: {EXT} Re: Spark Parquet write OOM

Posted by Anil Dasari <ad...@guidewire.com>.
2nd attempt..

Any suggestions to troubleshoot and fix the problem ? thanks in advance.

Regards,
Anil

From: Anil Dasari <ad...@guidewire.com>
Date: Wednesday, March 2, 2022 at 7:00 AM
To: Gourav Sengupta <go...@gmail.com>, Yang,Jie(INF) <ya...@baidu.com>
Cc: user@spark.apache.org <us...@spark.apache.org>
Subject: Re: {EXT} Re: Spark Parquet write OOM
Hi Gourav and Yang
Thanks for the response.

Please find the answers below.

1. What is the version of SPARK you are using?
[AD] : Spark 2.4.7 (EMR 5.33.1)

2. Are you doing a lot of in-memory transformations like adding columns, or running joins, or UDFs thus increasing the size of the data before writing out?
[AD] No. Only one new column is added. Our flow is

  1.  Read avro data from kafka
  2.  Avro deserialization and add new colum to RDD
  3.  Create spark dataframe (DF) against to latest schema (avro evolved schema) and persist to hive (checkpointing)
  4.  Create DF from hive (from step #c)
  5.  Deduplicate spark DF by primary key
  6.  Write DF to s3 in parquet format
  7.  Write metadata to s3

The failure is from spark batch job

3. Is your pipeline going to change or evolve soon, or the data volumes going to vary, or particularly increase, over time?
[AD] : Data volume Is fixed as it is batch job.

4. What is the memory that you are having in your executors, and drivers?
[AD] We running one core node and 50 task nodes .. i.e total 51 nodes ..each node can create 2 executors (2 core cpu and 8 gb memory)

5. Can you show the list of transformations that you are running ?
[AD] No explicit transformations other than basic map transformations required to create dataframe from avor record rdd.

Please let me know if yo have any questions.

Regards,
Anil

From: Gourav Sengupta <go...@gmail.com>
Date: Wednesday, March 2, 2022 at 1:07 AM
To: Yang,Jie(INF) <ya...@baidu.com>
Cc: Anil Dasari <ad...@guidewire.com>, user@spark.apache.org <us...@spark.apache.org>
Subject: {EXT} Re: Spark Parquet write OOM
Hi Anil,

before jumping to the quick symptomatic fix, can we try to understand the issues?

1. What is the version of SPARK you are using?
2. Are you doing a lot of in-memory transformations like adding columns, or running joins, or UDFs thus increasing the size of the data before writing out?
3. Is your pipeline going to change or evolve soon, or the data volumes going to vary, or particularly increase, over time?
4. What is the memory that you are having in your executors, and drivers?
5. Can you show the list of transformations that you are running ?




Regards,
Gourav Sengupta


On Wed, Mar 2, 2022 at 3:18 AM Yang,Jie(INF) <ya...@baidu.com>> wrote:
This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase the capacity of DirectByteBuffer size by configuring  `-XX:MaxDirectMemorySize` and this is a Java opts.

However, we'd better check the length of memory to be allocated,  because  `-XX:MaxDirectMemorySize` and `-Xmx` should have the same capacity by default.


发件人: Anil Dasari <ad...@guidewire.com>>
日期: 2022年3月2日 星期三 09:45
收件人: "user@spark.apache.org<ma...@spark.apache.org>" <us...@spark.apache.org>>
主题: Spark Parquet write OOM

Hello everyone,

We are writing Spark Data frame to s3 in parquet and it is failing with below exception.

I wanted to try following to avoid OOM


  1.  increase the default sql shuffle partitions to reduce load on parquet writer tasks to avoid OOM and
  2.  Increase user memory (reduce memory fraction) to have more memory for other data structures assuming parquet writer uses user memory.

I am not sure if these fixes the OOM issue. So wanted to reach out community for any suggestions. Please let me know.

Exception:

org.apache.spark.SparkException: Task failed while writing rows.
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
         at org.apache.spark.scheduler.Task.run(Task.scala:123)
         at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
         at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.OutOfMemoryError
         at sun.misc.Unsafe.allocateMemory(Native Method)
         at java.nio.DirectByteBuffer.<init>(http://DirectByteBuffer.java:127<http://DirectByteBuffer.java:127>)
         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
         at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
         at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
         at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
         at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
         at org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
         at org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
         at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
         at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
         at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
         at org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235)
         at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
         at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
         at org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:148)
         at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:130)
         at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
         at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
         at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
         at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
         at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1439)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
         ... 10 more
         Suppressed: java.io.IOException: The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: BLOCK
                 at org.apache.parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:168)
                 at org.apache.parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:160)
                 at org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:291)

Regards,
Anil

Re: {EXT} Re: Spark Parquet write OOM

Posted by Anil Dasari <ad...@guidewire.com>.
Hi Gourav and Yang
Thanks for the response.

Please find the answers below.

1. What is the version of SPARK you are using?
[AD] : Spark 2.4.7 (EMR 5.33.1)

2. Are you doing a lot of in-memory transformations like adding columns, or running joins, or UDFs thus increasing the size of the data before writing out?
[AD] No. Only one new column is added. Our flow is

  1.  Read avro data from kafka
  2.  Avro deserialization and add new colum to RDD
  3.  Create spark dataframe (DF) against to latest schema (avro evolved schema) and persist to hive (checkpointing)
  4.  Create DF from hive (from step #c)
  5.  Deduplicate spark DF by primary key
  6.  Write DF to s3 in parquet format
  7.  Write metadata to s3

The failure is from spark batch job

3. Is your pipeline going to change or evolve soon, or the data volumes going to vary, or particularly increase, over time?
[AD] : Data volume Is fixed as it is batch job.

4. What is the memory that you are having in your executors, and drivers?
[AD] We running one core node and 50 task nodes .. i.e total 51 nodes ..each node can create 2 executors (2 core cpu and 8 gb memory)

5. Can you show the list of transformations that you are running ?
[AD] No explicit transformations other than basic map transformations required to create dataframe from avor record rdd.

Please let me know if yo have any questions.

Regards,
Anil

From: Gourav Sengupta <go...@gmail.com>
Date: Wednesday, March 2, 2022 at 1:07 AM
To: Yang,Jie(INF) <ya...@baidu.com>
Cc: Anil Dasari <ad...@guidewire.com>, user@spark.apache.org <us...@spark.apache.org>
Subject: {EXT} Re: Spark Parquet write OOM
Hi Anil,

before jumping to the quick symptomatic fix, can we try to understand the issues?

1. What is the version of SPARK you are using?
2. Are you doing a lot of in-memory transformations like adding columns, or running joins, or UDFs thus increasing the size of the data before writing out?
3. Is your pipeline going to change or evolve soon, or the data volumes going to vary, or particularly increase, over time?
4. What is the memory that you are having in your executors, and drivers?
5. Can you show the list of transformations that you are running ?




Regards,
Gourav Sengupta


On Wed, Mar 2, 2022 at 3:18 AM Yang,Jie(INF) <ya...@baidu.com>> wrote:
This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase the capacity of DirectByteBuffer size by configuring  `-XX:MaxDirectMemorySize` and this is a Java opts.

However, we'd better check the length of memory to be allocated,  because  `-XX:MaxDirectMemorySize` and `-Xmx` should have the same capacity by default.


发件人: Anil Dasari <ad...@guidewire.com>>
日期: 2022年3月2日 星期三 09:45
收件人: "user@spark.apache.org<ma...@spark.apache.org>" <us...@spark.apache.org>>
主题: Spark Parquet write OOM

Hello everyone,

We are writing Spark Data frame to s3 in parquet and it is failing with below exception.

I wanted to try following to avoid OOM


  1.  increase the default sql shuffle partitions to reduce load on parquet writer tasks to avoid OOM and
  2.  Increase user memory (reduce memory fraction) to have more memory for other data structures assuming parquet writer uses user memory.

I am not sure if these fixes the OOM issue. So wanted to reach out community for any suggestions. Please let me know.

Exception:

org.apache.spark.SparkException: Task failed while writing rows.
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
         at org.apache.spark.scheduler.Task.run(Task.scala:123)
         at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
         at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.OutOfMemoryError
         at sun.misc.Unsafe.allocateMemory(Native Method)
         at java.nio.DirectByteBuffer.<init>(http://DirectByteBuffer.java:127<http://DirectByteBuffer.java:127>)
         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
         at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
         at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
         at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
         at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
         at org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
         at org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
         at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
         at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
         at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
         at org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235)
         at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
         at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
         at org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:148)
         at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:130)
         at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
         at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
         at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
         at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
         at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1439)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
         ... 10 more
         Suppressed: java.io.IOException: The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: BLOCK
                 at org.apache.parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:168)
                 at org.apache.parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:160)
                 at org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:291)

Regards,
Anil

Re: Spark Parquet write OOM

Posted by Gourav Sengupta <go...@gmail.com>.
Hi Anil,

before jumping to the quick symptomatic fix, can we try to understand the
issues?

1. What is the version of SPARK you are using?
2. Are you doing a lot of in-memory transformations like adding columns, or
running joins, or UDFs thus increasing the size of the data before writing
out?
3. Is your pipeline going to change or evolve soon, or the data volumes
going to vary, or particularly increase, over time?
4. What is the memory that you are having in your executors, and drivers?
5. Can you show the list of transformations that you are running ?




Regards,
Gourav Sengupta


On Wed, Mar 2, 2022 at 3:18 AM Yang,Jie(INF) <ya...@baidu.com> wrote:

> This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase
> the capacity of DirectByteBuffer size by configuring
>  `-XX:MaxDirectMemorySize` and this is a Java opts.
>
>
>
> However, we'd better check the length of memory to be allocated,  because
>  `-XX:MaxDirectMemorySize` and `-Xmx` should have the same capacity by
> default.
>
>
>
>
>
> *发件人**: *Anil Dasari <ad...@guidewire.com>
> *日期**: *2022年3月2日 星期三 09:45
> *收件人**: *"user@spark.apache.org" <us...@spark.apache.org>
> *主题**: *Spark Parquet write OOM
>
>
>
> Hello everyone,
>
>
>
> We are writing Spark Data frame to s3 in parquet and it is failing with
> below exception.
>
>
>
> I wanted to try following to avoid OOM
>
>
>
>    1. increase the default sql shuffle partitions to reduce load on
>    parquet writer tasks to avoid OOM and
>    2. Increase user memory (reduce memory fraction) to have more memory
>    for other data structures assuming parquet writer uses user memory.
>
>
>
> I am not sure if these fixes the OOM issue. So wanted to reach out
> community for any suggestions. Please let me know.
>
>
>
> Exception:
>
>
>
> org.apache.spark.SparkException: Task failed while writing rows.
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
>
>          at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>
>          at org.apache.spark.scheduler.Task.run(Task.scala:123)
>
>          at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>
>          at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
>
>          at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>
>          at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>          at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>          at java.lang.Thread.run(Thread.java:750)
>
> Caused by: java.lang.OutOfMemoryError
>
>          at sun.misc.Unsafe.allocateMemory(Native Method)
>
>          at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
>
>          at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>
>          at
> org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
>
>          at
> org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
>
>          at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
>
>          at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
>
>          at
> org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
>
>          at
> org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
>
>          at
> org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
>
>          at
> org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
>
>          at
> org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
>
>          at
> org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235)
>
>          at
> org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
>
>          at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
>
>          at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:148)
>
>          at
> org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:130)
>
>          at
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
>
>          at
> org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
>
>          at
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
>
>          at
> org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
>
>          at
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1439)
>
>          at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
>
>          ... 10 more
>
>          Suppressed: java.io.IOException: The file being written is in an
> invalid state. Probably caused by an error thrown previously. Current
> state: BLOCK
>
>                  at
> org.apache.parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:168)
>
>                  at
> org.apache.parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:160)
>
>                  at
> org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:291)
>
>
>
> Regards,
>
> Anil
>

Re: Spark Parquet write OOM

Posted by "Yang,Jie(INF)" <ya...@baidu.com>.
This is a DirectByteBuffer OOM,so plan 2 may not work, we can increase the capacity of DirectByteBuffer size by configuring  `-XX:MaxDirectMemorySize` and this is a Java opts.

However, we'd better check the length of memory to be allocated,  because  `-XX:MaxDirectMemorySize` and `-Xmx` should have the same capacity by default.


发件人: Anil Dasari <ad...@guidewire.com>
日期: 2022年3月2日 星期三 09:45
收件人: "user@spark.apache.org" <us...@spark.apache.org>
主题: Spark Parquet write OOM

Hello everyone,

We are writing Spark Data frame to s3 in parquet and it is failing with below exception.

I wanted to try following to avoid OOM


  1.  increase the default sql shuffle partitions to reduce load on parquet writer tasks to avoid OOM and
  2.  Increase user memory (reduce memory fraction) to have more memory for other data structures assuming parquet writer uses user memory.

I am not sure if these fixes the OOM issue. So wanted to reach out community for any suggestions. Please let me know.

Exception:

org.apache.spark.SparkException: Task failed while writing rows.
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
         at org.apache.spark.scheduler.Task.run(Task.scala:123)
         at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
         at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.OutOfMemoryError
         at sun.misc.Unsafe.allocateMemory(Native Method)
         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:127)
         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
         at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
         at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
         at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
         at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
         at org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
         at org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
         at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
         at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
         at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
         at org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235)
         at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
         at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
         at org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:148)
         at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:130)
         at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
         at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
         at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
         at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
         at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1439)
         at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
         ... 10 more
         Suppressed: java.io.IOException: The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: BLOCK
                 at org.apache.parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:168)
                 at org.apache.parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:160)
                 at org.apache.parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:291)

Regards,
Anil