You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by SLiZn Liu <sl...@gmail.com> on 2015/10/14 06:18:47 UTC

OutOfMemoryError When Reading Many json Files

Hey Spark Users,

I kept getting java.lang.OutOfMemoryError: Java heap space as I read a
massive amount of json files, iteratively via read.json(). Even the result
RDD is rather small, I still get the OOM Error. The brief structure of my
program reads as following, in psuedo-code:

file_path_list.map{ jsonFile: String =>
  sqlContext.read.json(jsonFile)
    .select($"some", $"fields")
    .withColumn("new_col", some_transformations($"col"))
    .rdd.map( x: Row => (k, v) )
    .combineByKey() // which groups a column into item lists by
another column as keys
}.reduce( (i, j) => i.union(j) )
.combineByKey() // which combines results from all json files

I confess some of the json files are Gigabytes huge, yet the combined RDD
is in a few Megabytes. I’m not familiar with the under-the-hood mechanism,
but my intuitive understanding of how the code executes is, read the file
once a time (where I can easily modify map to foreach when fetching from
file_path_list, if that’s the case), do the inner transformation on DF and
combine, then reduce and do the outer combine immediately, which doesn’t
require to hold all RDDs generated from all files in the memory. Obviously,
as my code raises OOM Error, I must have missed something important.

>From the debug log, I can tell the OOM Error happens when reading the same
file, which is in a modest size of 2GB, while driver.memory is set to 13GB,
and the available memory size before the code execution is around 8GB, on
my standalone machine running as “local[8]”.

To overcome this, I also tried to initialize an empty universal RDD
variable, iteratively read one file at a time using foreach, then instead
of reduce, simply combine each RDD generated by the json files, except the
OOM Error remains.

Other configurations:

   - set(“spark.storage.memoryFraction”, “0.1”) // no cache of RDD is used
   - set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)

Any suggestions other than scale up/out the spark cluster?

BR,
Todd Leo
​

Re: OutOfMemoryError When Reading Many json Files

Posted by SLiZn Liu <sl...@gmail.com>.
Yes it went wrong when processing a large file only. I removed
transformations on DF, and it worked just fine. But doing a simple filter
operation on the DF became the last straw that breaks the camel’s back.
That’s confusing.
​

On Wed, Oct 14, 2015 at 2:11 PM Deenar Toraskar <de...@gmail.com>
wrote:

> Hi
>
> Why dont you check if you can just process the large file standalone and
> then do the outer loop next.
>
> sqlContext.read.json(jsonFile) .select($"some", $"fields") .withColumn(
> "new_col", some_transformations($"col")) .rdd.map( x: Row => (k, v) )
> .combineByKey()
>
> Deenar
>
> On 14 October 2015 at 05:18, SLiZn Liu <sl...@gmail.com> wrote:
>
>> Hey Spark Users,
>>
>> I kept getting java.lang.OutOfMemoryError: Java heap space as I read a
>> massive amount of json files, iteratively via read.json(). Even the
>> result RDD is rather small, I still get the OOM Error. The brief structure
>> of my program reads as following, in psuedo-code:
>>
>> file_path_list.map{ jsonFile: String =>
>>   sqlContext.read.json(jsonFile)
>>     .select($"some", $"fields")
>>     .withColumn("new_col", some_transformations($"col"))
>>     .rdd.map( x: Row => (k, v) )
>>     .combineByKey() // which groups a column into item lists by another column as keys
>> }.reduce( (i, j) => i.union(j) )
>> .combineByKey() // which combines results from all json files
>>
>> I confess some of the json files are Gigabytes huge, yet the combined RDD
>> is in a few Megabytes. I’m not familiar with the under-the-hood mechanism,
>> but my intuitive understanding of how the code executes is, read the file
>> once a time (where I can easily modify map to foreach when fetching from
>> file_path_list, if that’s the case), do the inner transformation on DF
>> and combine, then reduce and do the outer combine immediately, which
>> doesn’t require to hold all RDDs generated from all files in the memory.
>> Obviously, as my code raises OOM Error, I must have missed something
>> important.
>>
>> From the debug log, I can tell the OOM Error happens when reading the
>> same file, which is in a modest size of 2GB, while driver.memory is set to
>> 13GB, and the available memory size before the code execution is around
>> 8GB, on my standalone machine running as “local[8]”.
>>
>> To overcome this, I also tried to initialize an empty universal RDD
>> variable, iteratively read one file at a time using foreach, then
>> instead of reduce, simply combine each RDD generated by the json files,
>> except the OOM Error remains.
>>
>> Other configurations:
>>
>>    - set(“spark.storage.memoryFraction”, “0.1”) // no cache of RDD is
>>    used
>>    - set(“spark.serializer”,
>>    “org.apache.spark.serializer.KryoSerializer”)
>>
>> Any suggestions other than scale up/out the spark cluster?
>>
>> BR,
>> Todd Leo
>> ​
>>
>
>

Re: OutOfMemoryError When Reading Many json Files

Posted by Deenar Toraskar <de...@gmail.com>.
Hi

Why dont you check if you can just process the large file standalone and
then do the outer loop next.

sqlContext.read.json(jsonFile) .select($"some", $"fields") .withColumn(
"new_col", some_transformations($"col")) .rdd.map( x: Row => (k, v) )
.combineByKey()

Deenar

On 14 October 2015 at 05:18, SLiZn Liu <sl...@gmail.com> wrote:

> Hey Spark Users,
>
> I kept getting java.lang.OutOfMemoryError: Java heap space as I read a
> massive amount of json files, iteratively via read.json(). Even the
> result RDD is rather small, I still get the OOM Error. The brief structure
> of my program reads as following, in psuedo-code:
>
> file_path_list.map{ jsonFile: String =>
>   sqlContext.read.json(jsonFile)
>     .select($"some", $"fields")
>     .withColumn("new_col", some_transformations($"col"))
>     .rdd.map( x: Row => (k, v) )
>     .combineByKey() // which groups a column into item lists by another column as keys
> }.reduce( (i, j) => i.union(j) )
> .combineByKey() // which combines results from all json files
>
> I confess some of the json files are Gigabytes huge, yet the combined RDD
> is in a few Megabytes. I’m not familiar with the under-the-hood mechanism,
> but my intuitive understanding of how the code executes is, read the file
> once a time (where I can easily modify map to foreach when fetching from
> file_path_list, if that’s the case), do the inner transformation on DF
> and combine, then reduce and do the outer combine immediately, which
> doesn’t require to hold all RDDs generated from all files in the memory.
> Obviously, as my code raises OOM Error, I must have missed something
> important.
>
> From the debug log, I can tell the OOM Error happens when reading the same
> file, which is in a modest size of 2GB, while driver.memory is set to 13GB,
> and the available memory size before the code execution is around 8GB, on
> my standalone machine running as “local[8]”.
>
> To overcome this, I also tried to initialize an empty universal RDD
> variable, iteratively read one file at a time using foreach, then instead
> of reduce, simply combine each RDD generated by the json files, except the
> OOM Error remains.
>
> Other configurations:
>
>    - set(“spark.storage.memoryFraction”, “0.1”) // no cache of RDD is used
>    - set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
>
> Any suggestions other than scale up/out the spark cluster?
>
> BR,
> Todd Leo
> ​
>