You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Juliet Hougland (JIRA)" <ji...@apache.org> on 2016/01/21 20:29:39 UTC

[jira] [Commented] (SPARK-4073) Parquet+Snappy can cause significant off-heap memory usage

    [ https://issues.apache.org/jira/browse/SPARK-4073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15111148#comment-15111148 ] 

Juliet Hougland commented on SPARK-4073:
----------------------------------------

I have run in to a related problem. I am reading a snappy compressed parquet file via pyspark and get the follow OOM error:

16/01/21 11:43:10 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[stdout writer for /opt/anaconda2/bin/python,5,main]
java.lang.OutOfMemoryError: Direct buffer memory
	at java.nio.Bits.reserveMemory(Bits.java:658)
	at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
	at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
	at parquet.hadoop.codec.SnappyDecompressor.setInput(SnappyDecompressor.java:102)
	at parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:46)
	at java.io.DataInputStream.readFully(DataInputStream.java:195)
	at java.io.DataInputStream.readFully(DataInputStream.java:169)
	at parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:204)
	at parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:89)
	at parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:72)
	at parquet.column.Encoding$1.initDictionary(Encoding.java:89)
	at parquet.column.Encoding$4.initDictionary(Encoding.java:148)
	at parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:337)
	at parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:66)
	at parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:61)
	at parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:270)
	at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134)
	at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99)
	at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154)
	at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:99)
	at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:135)
	at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:205)
	at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
	at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:119)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:114)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:405)
	at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
	at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)



> Parquet+Snappy can cause significant off-heap memory usage
> ----------------------------------------------------------
>
>                 Key: SPARK-4073
>                 URL: https://issues.apache.org/jira/browse/SPARK-4073
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.1.0
>            Reporter: Patrick Wendell
>            Priority: Critical
>
> The parquet snappy codec allocates off-heap buffers for decompression[1]. In one cases the observed size of these buffers was high enough to add several GB of data to the overall virtual memory usage of the Spark executor process. I don't understand enough about our use of Snappy to fully grok how much data we would _expect_ to be present in these buffers at any given time, but I can say a few things.
> 1. The dataset had individual rows that were fairly large, e.g. megabytes.
> 2. Direct buffers are not cleaned up until GC events, and overall there was not much heap contention. So maybe they just weren't being cleaned.
> I opened PARQUET-118 to see if they can provide an option to use on-heap buffers for decompression. In the mean time, we could consider changing the default back to gzip, or we could do nothing (not sure how many other users will hit this).
> [1] https://github.com/apache/incubator-parquet-mr/blob/master/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyDecompressor.java#L28



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org