You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Alec Swan <al...@gmail.com> on 2017/11/14 00:22:58 UTC

Process large JSON file without causing OOM

Hello,

I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB
format. Effectively, my Java service starts up an embedded Spark cluster
(master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I
keep getting OOM errors with large (~1GB) files.

I've tried different ways to reduce memory usage, e.g. by partitioning data
with dataSet.partitionBy("customer).save(filePath), or capping memory usage
by setting spark.executor.memory=1G, but to no vail.

I am wondering if there is a way to avoid OOM besides splitting the source
JSON file into multiple smaller ones and processing the small ones
individually? Does Spark SQL have to read the JSON/Snappy (row-based) file
in it's entirety before converting it to ORC (columnar)? If so, would it
make sense to create a custom receiver that reads the Snappy file and use
Spark streaming for ORC conversion?

Thanks,

Alec

Re: Process large JSON file without causing OOM

Posted by Alec Swan <al...@gmail.com>.
Pinging back to see if anybody could provide me with some pointers on hot
to stream/batch JSON-to-ORC conversion in Spark SQL or why I get an OOM
dump with such small memory footprint?

Thanks,

Alec

On Wed, Nov 15, 2017 at 11:03 AM, Alec Swan <al...@gmail.com> wrote:

> Thanks Steve and Vadim for the feedback.
>
> @Steve, are you suggesting creating a custom receiver and somehow piping
> it through Spark Streaming/Spark SQL? Or are you suggesting creating
> smaller datasets from the stream and using my original code to process
> smaller datasets? It'd be very helpful for a novice, like myself, if you
> could provide code samples or links to docs/articles.
>
> @Vadim, I ran my test with local[1] and got OOM in the same place. What
> puzzles me is that when I expect the heap dump with VisualVM (see below) it
> says that the heap is pretty small ~35MB. I am running my test with
> "-Xmx10G -Dspark.executor.memory=6g  -Dspark.driver.memory=6g" JVM opts and
> I can see them reflected in Spark UI. Am I missing some memory settings?
>
>     Date taken: Wed Nov 15 10:46:06 MST 2017
>     File: /tmp/java_pid69786.hprof
>     File size: 59.5 MB
>
>     Total bytes: 39,728,337
>     Total classes: 15,749
>     Total instances: 437,979
>     Classloaders: 123
>     GC roots: 2,831
>     Number of objects pending for finalization: 5,198
>
>
> Thanks,
>
> Alec
>
> On Wed, Nov 15, 2017 at 11:15 AM, Vadim Semenov <
> vadim.semenov@datadoghq.com> wrote:
>
>> There's a lot of off-heap memory involved in decompressing Snappy,
>> compressing ZLib.
>>
>> Since you're running using `local[*]`, you process multiple tasks
>> simultaneously, so they all might consume memory.
>>
>> I don't think that increasing heap will help, since it looks like you're
>> hitting system memory limits.
>>
>> I'd suggest trying to run with `local[2]` and checking what's the memory
>> usage of the jvm process.
>>
>> On Mon, Nov 13, 2017 at 7:22 PM, Alec Swan <al...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB
>>> format. Effectively, my Java service starts up an embedded Spark cluster
>>> (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I
>>> keep getting OOM errors with large (~1GB) files.
>>>
>>> I've tried different ways to reduce memory usage, e.g. by partitioning
>>> data with dataSet.partitionBy("customer).save(filePath), or capping
>>> memory usage by setting spark.executor.memory=1G, but to no vail.
>>>
>>> I am wondering if there is a way to avoid OOM besides splitting the
>>> source JSON file into multiple smaller ones and processing the small ones
>>> individually? Does Spark SQL have to read the JSON/Snappy (row-based) file
>>> in it's entirety before converting it to ORC (columnar)? If so, would it
>>> make sense to create a custom receiver that reads the Snappy file and use
>>> Spark streaming for ORC conversion?
>>>
>>> Thanks,
>>>
>>> Alec
>>>
>>
>>
>

Re: Process large JSON file without causing OOM

Posted by Alec Swan <al...@gmail.com>.
Thanks Steve and Vadim for the feedback.

@Steve, are you suggesting creating a custom receiver and somehow piping it
through Spark Streaming/Spark SQL? Or are you suggesting creating smaller
datasets from the stream and using my original code to process smaller
datasets? It'd be very helpful for a novice, like myself, if you could
provide code samples or links to docs/articles.

@Vadim, I ran my test with local[1] and got OOM in the same place. What
puzzles me is that when I expect the heap dump with VisualVM (see below) it
says that the heap is pretty small ~35MB. I am running my test with
"-Xmx10G -Dspark.executor.memory=6g  -Dspark.driver.memory=6g" JVM opts and
I can see them reflected in Spark UI. Am I missing some memory settings?

    Date taken: Wed Nov 15 10:46:06 MST 2017
    File: /tmp/java_pid69786.hprof
    File size: 59.5 MB

    Total bytes: 39,728,337
    Total classes: 15,749
    Total instances: 437,979
    Classloaders: 123
    GC roots: 2,831
    Number of objects pending for finalization: 5,198


Thanks,

Alec

On Wed, Nov 15, 2017 at 11:15 AM, Vadim Semenov <vadim.semenov@datadoghq.com
> wrote:

> There's a lot of off-heap memory involved in decompressing Snappy,
> compressing ZLib.
>
> Since you're running using `local[*]`, you process multiple tasks
> simultaneously, so they all might consume memory.
>
> I don't think that increasing heap will help, since it looks like you're
> hitting system memory limits.
>
> I'd suggest trying to run with `local[2]` and checking what's the memory
> usage of the jvm process.
>
> On Mon, Nov 13, 2017 at 7:22 PM, Alec Swan <al...@gmail.com> wrote:
>
>> Hello,
>>
>> I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB
>> format. Effectively, my Java service starts up an embedded Spark cluster
>> (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I
>> keep getting OOM errors with large (~1GB) files.
>>
>> I've tried different ways to reduce memory usage, e.g. by partitioning
>> data with dataSet.partitionBy("customer).save(filePath), or capping
>> memory usage by setting spark.executor.memory=1G, but to no vail.
>>
>> I am wondering if there is a way to avoid OOM besides splitting the
>> source JSON file into multiple smaller ones and processing the small ones
>> individually? Does Spark SQL have to read the JSON/Snappy (row-based) file
>> in it's entirety before converting it to ORC (columnar)? If so, would it
>> make sense to create a custom receiver that reads the Snappy file and use
>> Spark streaming for ORC conversion?
>>
>> Thanks,
>>
>> Alec
>>
>
>

Re: Process large JSON file without causing OOM

Posted by Vadim Semenov <va...@datadoghq.com>.
There's a lot of off-heap memory involved in decompressing Snappy,
compressing ZLib.

Since you're running using `local[*]`, you process multiple tasks
simultaneously, so they all might consume memory.

I don't think that increasing heap will help, since it looks like you're
hitting system memory limits.

I'd suggest trying to run with `local[2]` and checking what's the memory
usage of the jvm process.

On Mon, Nov 13, 2017 at 7:22 PM, Alec Swan <al...@gmail.com> wrote:

> Hello,
>
> I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB
> format. Effectively, my Java service starts up an embedded Spark cluster
> (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I
> keep getting OOM errors with large (~1GB) files.
>
> I've tried different ways to reduce memory usage, e.g. by partitioning
> data with dataSet.partitionBy("customer).save(filePath), or capping
> memory usage by setting spark.executor.memory=1G, but to no vail.
>
> I am wondering if there is a way to avoid OOM besides splitting the source
> JSON file into multiple smaller ones and processing the small ones
> individually? Does Spark SQL have to read the JSON/Snappy (row-based) file
> in it's entirety before converting it to ORC (columnar)? If so, would it
> make sense to create a custom receiver that reads the Snappy file and use
> Spark streaming for ORC conversion?
>
> Thanks,
>
> Alec
>

Re: Process large JSON file without causing OOM

Posted by Steve Loughran <st...@hortonworks.com>.

On 14 Nov 2017, at 15:32, Alec Swan <al...@gmail.com>> wrote:

 But I wonder if there is a way to stream/batch the content of JSON file in order to convert it to ORC piecemeal and avoid reading the whole JSON file in memory in the first place?




That is what you'll need to do; you'd hit similar problems if you had the same files, same allocated JVM space and the same # of threads trying to read in the files.

Jackson has a streaming API: http://www.baeldung.com/jackson-streaming-api

Re: Process large JSON file without causing OOM

Posted by Alec Swan <al...@gmail.com>.
Thanks all. I am not submitting a spark job explicitly. Instead, I am using
the Spark library functionality embedded in my web service as shown in the
code I included in the previous email. So, effectively Spark SQL runs in
the web service's JVM. Therefore, --driver-memory option would not (and did
not) work for me.

I did try setting the following environment variables
SPARK_DRIVER_MEMORY=5g;SPARK_EXECUTOR_MEMORY=5g but they didn't have any
effect. Passing "-Dspark.executor.memory=6g  -Dspark.driver.memory=6g" JVM
parameters had the same effect as setting them in SparkConf in the code,
i.e. they showed up in Spark UI but I still got OOM.

My use case is somewhat strange because I just wanted to use Spark SQL
library for it's multi-format (ORC, Parquet, JSON) support but I really
didn't really need the rest of Spark functionality. Should I be considering
submitting my Spark code as a job (to be run locally) from the web service
code?

So far, in this thread we've been focusing on configuring larger memory
pools. But I wonder if there is a way to stream/batch the content of JSON
file in order to convert it to ORC piecemeal and avoid reading the whole
JSON file in memory in the first place?




Thanks,

Alec

On Tue, Nov 14, 2017 at 2:58 AM, Sonal Goyal <so...@gmail.com> wrote:

> If you are running Spark with local[*] as master, there will be a single
> process whose memory will be controlled by --driver-memory command line
> option to spark submit. Check
>
> http://spark.apache.org/docs/latest/configuration.html
>
> spark.driver.memory 1g Amount of memory to use for the driver process,
> i.e. where SparkContext is initialized. (e.g. 1g, 2g).
> *Note:* In client mode, this config must not be set through the SparkConf directly
> in your application, because the driver JVM has already started at that
> point. Instead, please set this through the --driver-memory command line
> option or in your default properties file.
>
> Thanks,
> Sonal
> Nube Technologies <http://www.nubetech.co>
>
> <http://in.linkedin.com/in/sonalgoyal>
>
>
>
> On Tue, Nov 14, 2017 at 9:37 AM, Alec Swan <al...@gmail.com> wrote:
>
>> Hi Joel,
>>
>> Here are the relevant snippets of my code and an OOM error thrown
>> in frameWriter.save(..). Surprisingly, the heap dump is pretty small ~60MB
>> even though I am running with -Xmx10G and 4G executor and driver memory as
>> shown below.
>>
>>         SparkConf sparkConf = new SparkConf()
>>                 .setAppName("My Service")
>>                 .setMaster("local[*]")
>>                 .set("spark.ui.enabled", "true")
>>                 .set("spark.executor.memory", "4G")
>>                 .set("spark.driver.memory", "4G");
>>
>>         sparkSessionBuilder = SparkSession.builder().config(
>> sparkConf).enableHiveSupport();
>>
>>         Dataset<Row> events = sparkSession.read()
>>                 .format("json")
>>                 .schema(inputConfig.getSchema())
>>                 .load(inputFile.getPath());
>>
>>         DataFrameWriter<Row> frameWriter = events.selectExpr(JavaConversi
>> ons.asScalaBuffer(outputSchema.getColumns())) // select "data.customer
>> AS `customer`", ...
>>                 .write()
>>                 .options(outputConfig.getProperties())
>> // compression=zlib
>>                 .format("orc")
>>                 .partitionBy(JavaConversions.a
>> sScalaBuffer(outputSchema.getPartitions())) // partition by "customer"
>>                 .save(outputUri.getPath());
>>
>>
>> Here is the error log I get at runtime:
>>
>> 17/11/14 03:36:16 INFO CodeGenerator: Code generated in 115.616924 ms
>> 17/11/14 03:36:17 INFO CodecPool: Got brand-new decompressor [.snappy]
>> java.lang.OutOfMemoryError: Java heap space
>> Dumping heap to java_pid3790.hprof ...
>> Heap dump file created [62653841 bytes in 2.212 secs]
>> #
>> # java.lang.OutOfMemoryError: Java heap space
>> # -XX:OnOutOfMemoryError="kill -9 %p"
>> #   Executing "kill -9 3790"...
>>
>>
>> And here is the thread from the thread dump that caused OOM:
>>
>> "Executor task launch worker for task 0" daemon prio=5 tid=90 RUNNABLE
>> at java.lang.OutOfMemoryError.<init>(OutOfMemoryError.java:48)
>> at org.apache.hadoop.io.compress.BlockDecompressorStream.getCom
>> pressedData(BlockDecompressorStream.java:123)
>> at org.apache.hadoop.io.compress.BlockDecompressorStream.decomp
>> ress(BlockDecompressorStream.java:98)
>> at org.apache.hadoop.io.compress.DecompressorStream.read(Decomp
>> ressorStream.java:85)
>> at java.io.InputStream.read(InputStream.java:101)
>> at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
>>    Local Variable: byte[]#3957
>>    Local Variable: org.apache.hadoop.io.compress.
>> BlockDecompressorStream#1
>> at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
>> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
>>    Local Variable: org.apache.hadoop.mapreduce.li
>> b.input.SplitLineReader#1
>>    Local Variable: org.apache.hadoop.io.Text#5
>> at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipU
>> tfByteOrderMark(LineRecordReader.java:144)
>> at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextK
>> eyValue(LineRecordReader.java:184)
>>    Local Variable: org.apache.hadoop.mapreduce.li
>> b.input.LineRecordReader#1
>> at org.apache.spark.sql.execution.datasources.RecordReaderItera
>> tor.hasNext(RecordReaderIterator.scala:39)
>>    Local Variable: org.apache.spark.sql.execution
>> .datasources.RecordReaderIterator#1
>> at org.apache.spark.sql.execution.datasources.HadoopFileLinesRe
>> ader.hasNext(HadoopFileLinesReader.scala:50)
>>    Local Variable: org.apache.spark.sql.execution
>> .datasources.HadoopFileLinesReader#1
>> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
>>    Local Variable: scala.collection.Iterator$$anon$12#1
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>> at org.apache.spark.sql.execution.datasources.FileScanRDD$$
>> anon$1.hasNext(FileScanRDD.scala:105)
>> at org.apache.spark.sql.execution.datasources.FileScanRDD$$
>> anon$1.nextIterator(FileScanRDD.scala:177)
>> at org.apache.spark.sql.execution.datasources.FileScanRDD$$
>> anon$1.hasNext(FileScanRDD.scala:105)
>>    Local Variable: org.apache.spark.sql.execution
>> .datasources.FileScanRDD$$anon$1#1
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>> at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(
>> UnsafeExternalRowSorter.java:190)
>> at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(
>> SortExec.scala:108)
>>    Local Variable: org.apache.spark.sql.execution
>> .UnsafeExternalRowSorter#1
>>    Local Variable: org.apache.spark.executor.TaskMetrics#2
>> at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(
>> SortExec.scala:101)
>>    Local Variable: org.apache.spark.sql.execution.SortExec$$anonfun$1#2
>> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$
>> anonfun$apply$25.apply(RDD.scala:827)
>> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$
>> anonfun$apply$25.apply(RDD.scala:827)
>>    Local Variable: scala.collection.Iterator$$anon$11#2
>>    Local Variable: org.apache.spark.rdd.RDD$$anon
>> fun$mapPartitionsInternal$1$$anonfun$apply$25#2
>>    Local Variable: java.lang.Integer#1
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>    Local Variable: org.apache.spark.sql.execution
>> .datasources.FilePartition#2
>>    Local Variable: org.apache.spark.storage.StorageLevel#1
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>>    Local Variable: org.apache.spark.rdd.MapPartitionsRDD#4
>>    Local Variable: org.apache.spark.serializer.JavaSerializerInstance#4
>>    Local Variable: scala.Tuple2#1572
>>    Local Variable: org.apache.spark.sql.execution
>> .datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1#2
>>    Local Variable: scala.Tuple2#1571
>>    Local Variable: org.apache.spark.TaskContextImpl#1
>> at org.apache.spark.scheduler.Task.run(Task.scala:108)
>>    Local Variable: org.apache.spark.scheduler.ResultTask#2
>>    Local Variable: org.apache.spark.metrics.MetricsSystem#1
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>>    Local Variable: org.apache.spark.serializer.JavaSerializerInstance#5
>>    Local Variable: org.apache.spark.memory.TaskMemoryManager#1
>>    Local Variable: sun.management.ThreadImpl#1
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>>    Local Variable: java.util.concurrent.ThreadPoolExecutor#6
>>    Local Variable: org.apache.spark.executor.Executor$TaskRunner#1
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>>    Local Variable: java.util.concurrent.ThreadPoolExecutor$Worker#26
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>> Thanks,
>>
>> Alec
>>
>> On Mon, Nov 13, 2017 at 8:30 PM, Joel D <ga...@gmail.com> wrote:
>>
>>> Have you tried increasing driver, exec mem (gc overhead too if required)?
>>>
>>> your code snippet and stack trace will be helpful.
>>>
>>> On Mon, Nov 13, 2017 at 7:23 PM Alec Swan <al...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB
>>>> format. Effectively, my Java service starts up an embedded Spark cluster
>>>> (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I
>>>> keep getting OOM errors with large (~1GB) files.
>>>>
>>>> I've tried different ways to reduce memory usage, e.g. by partitioning
>>>> data with dataSet.partitionBy("customer).save(filePath), or capping
>>>> memory usage by setting spark.executor.memory=1G, but to no vail.
>>>>
>>>> I am wondering if there is a way to avoid OOM besides splitting the
>>>> source JSON file into multiple smaller ones and processing the small ones
>>>> individually? Does Spark SQL have to read the JSON/Snappy (row-based) file
>>>> in it's entirety before converting it to ORC (columnar)? If so, would it
>>>> make sense to create a custom receiver that reads the Snappy file and use
>>>> Spark streaming for ORC conversion?
>>>>
>>>> Thanks,
>>>>
>>>> Alec
>>>>
>>>>
>>>>
>>>>
>>>>
>>
>

Re: Process large JSON file without causing OOM

Posted by Sonal Goyal <so...@gmail.com>.
If you are running Spark with local[*] as master, there will be a single
process whose memory will be controlled by --driver-memory command line
option to spark submit. Check

http://spark.apache.org/docs/latest/configuration.html

spark.driver.memory 1g Amount of memory to use for the driver process, i.e.
where SparkContext is initialized. (e.g. 1g, 2g).
*Note:* In client mode, this config must not be set through the
SparkConf directly
in your application, because the driver JVM has already started at that
point. Instead, please set this through the --driver-memory command line
option or in your default properties file.

Thanks,
Sonal
Nube Technologies <http://www.nubetech.co>

<http://in.linkedin.com/in/sonalgoyal>



On Tue, Nov 14, 2017 at 9:37 AM, Alec Swan <al...@gmail.com> wrote:

> Hi Joel,
>
> Here are the relevant snippets of my code and an OOM error thrown
> in frameWriter.save(..). Surprisingly, the heap dump is pretty small ~60MB
> even though I am running with -Xmx10G and 4G executor and driver memory as
> shown below.
>
>         SparkConf sparkConf = new SparkConf()
>                 .setAppName("My Service")
>                 .setMaster("local[*]")
>                 .set("spark.ui.enabled", "true")
>                 .set("spark.executor.memory", "4G")
>                 .set("spark.driver.memory", "4G");
>
>         sparkSessionBuilder = SparkSession.builder().config(
> sparkConf).enableHiveSupport();
>
>         Dataset<Row> events = sparkSession.read()
>                 .format("json")
>                 .schema(inputConfig.getSchema())
>                 .load(inputFile.getPath());
>
>         DataFrameWriter<Row> frameWriter = events.selectExpr(
> JavaConversions.asScalaBuffer(outputSchema.getColumns())) //
> select "data.customer AS `customer`", ...
>                 .write()
>                 .options(outputConfig.getProperties()) // compression=zlib
>                 .format("orc")
>                 .partitionBy(JavaConversions.asScalaBuffer(outputSchema.getPartitions()))
> // partition by "customer"
>                 .save(outputUri.getPath());
>
>
> Here is the error log I get at runtime:
>
> 17/11/14 03:36:16 INFO CodeGenerator: Code generated in 115.616924 ms
> 17/11/14 03:36:17 INFO CodecPool: Got brand-new decompressor [.snappy]
> java.lang.OutOfMemoryError: Java heap space
> Dumping heap to java_pid3790.hprof ...
> Heap dump file created [62653841 bytes in 2.212 secs]
> #
> # java.lang.OutOfMemoryError: Java heap space
> # -XX:OnOutOfMemoryError="kill -9 %p"
> #   Executing "kill -9 3790"...
>
>
> And here is the thread from the thread dump that caused OOM:
>
> "Executor task launch worker for task 0" daemon prio=5 tid=90 RUNNABLE
> at java.lang.OutOfMemoryError.<init>(OutOfMemoryError.java:48)
> at org.apache.hadoop.io.compress.BlockDecompressorStream.
> getCompressedData(BlockDecompressorStream.java:123)
> at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(
> BlockDecompressorStream.java:98)
> at org.apache.hadoop.io.compress.DecompressorStream.read(
> DecompressorStream.java:85)
> at java.io.InputStream.read(InputStream.java:101)
> at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
>    Local Variable: byte[]#3957
>    Local Variable: org.apache.hadoop.io.compress.BlockDecompressorStream#1
> at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
>    Local Variable: org.apache.hadoop.mapreduce.lib.input.SplitLineReader#1
>    Local Variable: org.apache.hadoop.io.Text#5
> at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.
> skipUtfByteOrderMark(LineRecordReader.java:144)
> at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.
> nextKeyValue(LineRecordReader.java:184)
>    Local Variable: org.apache.hadoop.mapreduce.
> lib.input.LineRecordReader#1
> at org.apache.spark.sql.execution.datasources.
> RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
>    Local Variable: org.apache.spark.sql.execution.datasources.
> RecordReaderIterator#1
> at org.apache.spark.sql.execution.datasources.
> HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
>    Local Variable: org.apache.spark.sql.execution.datasources.
> HadoopFileLinesReader#1
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
>    Local Variable: scala.collection.Iterator$$anon$12#1
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:105)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.
> nextIterator(FileScanRDD.scala:177)
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:105)
>    Local Variable: org.apache.spark.sql.execution.datasources.
> FileScanRDD$$anon$1#1
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(
> UnsafeExternalRowSorter.java:190)
> at org.apache.spark.sql.execution.SortExec$$anonfun$1.
> apply(SortExec.scala:108)
>    Local Variable: org.apache.spark.sql.execution.
> UnsafeExternalRowSorter#1
>    Local Variable: org.apache.spark.executor.TaskMetrics#2
> at org.apache.spark.sql.execution.SortExec$$anonfun$1.
> apply(SortExec.scala:101)
>    Local Variable: org.apache.spark.sql.execution.SortExec$$anonfun$1#2
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$25.apply(RDD.scala:827)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$25.apply(RDD.scala:827)
>    Local Variable: scala.collection.Iterator$$anon$11#2
>    Local Variable: org.apache.spark.rdd.RDD$$
> anonfun$mapPartitionsInternal$1$$anonfun$apply$25#2
>    Local Variable: java.lang.Integer#1
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>    Local Variable: org.apache.spark.sql.execution.datasources.
> FilePartition#2
>    Local Variable: org.apache.spark.storage.StorageLevel#1
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>    Local Variable: org.apache.spark.rdd.MapPartitionsRDD#4
>    Local Variable: org.apache.spark.serializer.JavaSerializerInstance#4
>    Local Variable: scala.Tuple2#1572
>    Local Variable: org.apache.spark.sql.execution.datasources.
> FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1#2
>    Local Variable: scala.Tuple2#1571
>    Local Variable: org.apache.spark.TaskContextImpl#1
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
>    Local Variable: org.apache.spark.scheduler.ResultTask#2
>    Local Variable: org.apache.spark.metrics.MetricsSystem#1
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>    Local Variable: org.apache.spark.serializer.JavaSerializerInstance#5
>    Local Variable: org.apache.spark.memory.TaskMemoryManager#1
>    Local Variable: sun.management.ThreadImpl#1
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>    Local Variable: java.util.concurrent.ThreadPoolExecutor#6
>    Local Variable: org.apache.spark.executor.Executor$TaskRunner#1
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>    Local Variable: java.util.concurrent.ThreadPoolExecutor$Worker#26
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> Thanks,
>
> Alec
>
> On Mon, Nov 13, 2017 at 8:30 PM, Joel D <ga...@gmail.com> wrote:
>
>> Have you tried increasing driver, exec mem (gc overhead too if required)?
>>
>> your code snippet and stack trace will be helpful.
>>
>> On Mon, Nov 13, 2017 at 7:23 PM Alec Swan <al...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB
>>> format. Effectively, my Java service starts up an embedded Spark cluster
>>> (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I
>>> keep getting OOM errors with large (~1GB) files.
>>>
>>> I've tried different ways to reduce memory usage, e.g. by partitioning
>>> data with dataSet.partitionBy("customer).save(filePath), or capping
>>> memory usage by setting spark.executor.memory=1G, but to no vail.
>>>
>>> I am wondering if there is a way to avoid OOM besides splitting the
>>> source JSON file into multiple smaller ones and processing the small ones
>>> individually? Does Spark SQL have to read the JSON/Snappy (row-based) file
>>> in it's entirety before converting it to ORC (columnar)? If so, would it
>>> make sense to create a custom receiver that reads the Snappy file and use
>>> Spark streaming for ORC conversion?
>>>
>>> Thanks,
>>>
>>> Alec
>>>
>>>
>>>
>>>
>>>
>

Re: Process large JSON file without causing OOM

Posted by Alec Swan <al...@gmail.com>.
Hi Joel,

Here are the relevant snippets of my code and an OOM error thrown
in frameWriter.save(..). Surprisingly, the heap dump is pretty small ~60MB
even though I am running with -Xmx10G and 4G executor and driver memory as
shown below.

        SparkConf sparkConf = new SparkConf()
                .setAppName("My Service")
                .setMaster("local[*]")
                .set("spark.ui.enabled", "true")
                .set("spark.executor.memory", "4G")
                .set("spark.driver.memory", "4G");

        sparkSessionBuilder =
SparkSession.builder().config(sparkConf).enableHiveSupport();

        Dataset<Row> events = sparkSession.read()
                .format("json")
                .schema(inputConfig.getSchema())
                .load(inputFile.getPath());

        DataFrameWriter<Row> frameWriter =
events.selectExpr(JavaConversions.asScalaBuffer(outputSchema.getColumns()))
// select "data.customer AS `customer`", ...
                .write()
                .options(outputConfig.getProperties()) // compression=zlib
                .format("orc")

.partitionBy(JavaConversions.asScalaBuffer(outputSchema.getPartitions()))
// partition by "customer"
                .save(outputUri.getPath());


Here is the error log I get at runtime:

17/11/14 03:36:16 INFO CodeGenerator: Code generated in 115.616924 ms
17/11/14 03:36:17 INFO CodecPool: Got brand-new decompressor [.snappy]
java.lang.OutOfMemoryError: Java heap space
Dumping heap to java_pid3790.hprof ...
Heap dump file created [62653841 bytes in 2.212 secs]
#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing "kill -9 3790"...


And here is the thread from the thread dump that caused OOM:

"Executor task launch worker for task 0" daemon prio=5 tid=90 RUNNABLE
at java.lang.OutOfMemoryError.<init>(OutOfMemoryError.java:48)
at
org.apache.hadoop.io.compress.BlockDecompressorStream.getCompressedData(BlockDecompressorStream.java:123)
at
org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:98)
at
org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
at java.io.InputStream.read(InputStream.java:101)
at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
   Local Variable: byte[]#3957
   Local Variable: org.apache.hadoop.io.compress.BlockDecompressorStream#1
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
   Local Variable: org.apache.hadoop.mapreduce.lib.input.SplitLineReader#1
   Local Variable: org.apache.hadoop.io.Text#5
at
org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:144)
at
org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:184)
   Local Variable: org.apache.hadoop.mapreduce.lib.input.LineRecordReader#1
at
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
   Local Variable:
org.apache.spark.sql.execution.datasources.RecordReaderIterator#1
at
org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
   Local Variable:
org.apache.spark.sql.execution.datasources.HadoopFileLinesReader#1
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
   Local Variable: scala.collection.Iterator$$anon$12#1
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
   Local Variable:
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1#1
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:190)
at
org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
   Local Variable: org.apache.spark.sql.execution.UnsafeExternalRowSorter#1
   Local Variable: org.apache.spark.executor.TaskMetrics#2
at
org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
   Local Variable: org.apache.spark.sql.execution.SortExec$$anonfun$1#2
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
   Local Variable: scala.collection.Iterator$$anon$11#2
   Local Variable:
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25#2
   Local Variable: java.lang.Integer#1
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
   Local Variable:
org.apache.spark.sql.execution.datasources.FilePartition#2
   Local Variable: org.apache.spark.storage.StorageLevel#1
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
   Local Variable: org.apache.spark.rdd.MapPartitionsRDD#4
   Local Variable: org.apache.spark.serializer.JavaSerializerInstance#4
   Local Variable: scala.Tuple2#1572
   Local Variable:
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1#2
   Local Variable: scala.Tuple2#1571
   Local Variable: org.apache.spark.TaskContextImpl#1
at org.apache.spark.scheduler.Task.run(Task.scala:108)
   Local Variable: org.apache.spark.scheduler.ResultTask#2
   Local Variable: org.apache.spark.metrics.MetricsSystem#1
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
   Local Variable: org.apache.spark.serializer.JavaSerializerInstance#5
   Local Variable: org.apache.spark.memory.TaskMemoryManager#1
   Local Variable: sun.management.ThreadImpl#1
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   Local Variable: java.util.concurrent.ThreadPoolExecutor#6
   Local Variable: org.apache.spark.executor.Executor$TaskRunner#1
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   Local Variable: java.util.concurrent.ThreadPoolExecutor$Worker#26
at java.lang.Thread.run(Thread.java:745)



Thanks,

Alec

On Mon, Nov 13, 2017 at 8:30 PM, Joel D <ga...@gmail.com> wrote:

> Have you tried increasing driver, exec mem (gc overhead too if required)?
>
> your code snippet and stack trace will be helpful.
>
> On Mon, Nov 13, 2017 at 7:23 PM Alec Swan <al...@gmail.com> wrote:
>
>> Hello,
>>
>> I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB
>> format. Effectively, my Java service starts up an embedded Spark cluster
>> (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I
>> keep getting OOM errors with large (~1GB) files.
>>
>> I've tried different ways to reduce memory usage, e.g. by partitioning
>> data with dataSet.partitionBy("customer).save(filePath), or capping
>> memory usage by setting spark.executor.memory=1G, but to no vail.
>>
>> I am wondering if there is a way to avoid OOM besides splitting the
>> source JSON file into multiple smaller ones and processing the small ones
>> individually? Does Spark SQL have to read the JSON/Snappy (row-based) file
>> in it's entirety before converting it to ORC (columnar)? If so, would it
>> make sense to create a custom receiver that reads the Snappy file and use
>> Spark streaming for ORC conversion?
>>
>> Thanks,
>>
>> Alec
>>
>>
>>
>>
>>

Re: Process large JSON file without causing OOM

Posted by Joel D <ga...@gmail.com>.
Have you tried increasing driver, exec mem (gc overhead too if required)?

your code snippet and stack trace will be helpful.

On Mon, Nov 13, 2017 at 7:23 PM Alec Swan <al...@gmail.com> wrote:

> Hello,
>
> I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB
> format. Effectively, my Java service starts up an embedded Spark cluster
> (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I
> keep getting OOM errors with large (~1GB) files.
>
> I've tried different ways to reduce memory usage, e.g. by partitioning
> data with dataSet.partitionBy("customer).save(filePath), or capping memory
> usage by setting spark.executor.memory=1G, but to no vail.
>
> I am wondering if there is a way to avoid OOM besides splitting the source
> JSON file into multiple smaller ones and processing the small ones
> individually? Does Spark SQL have to read the JSON/Snappy (row-based) file
> in it's entirety before converting it to ORC (columnar)? If so, would it
> make sense to create a custom receiver that reads the Snappy file and use
> Spark streaming for ORC conversion?
>
> Thanks,
>
> Alec
>
>
>
>
>

Re: Process large JSON file without causing OOM

Posted by vaquar khan <va...@gmail.com>.
https://stackoverflow.com/questions/26562033/how-to-set-apache-spark-executor-memory

Regards,
Vaquar khan

On Mon, Nov 13, 2017 at 6:22 PM, Alec Swan <al...@gmail.com> wrote:

> Hello,
>
> I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB
> format. Effectively, my Java service starts up an embedded Spark cluster
> (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I
> keep getting OOM errors with large (~1GB) files.
>
> I've tried different ways to reduce memory usage, e.g. by partitioning
> data with dataSet.partitionBy("customer).save(filePath), or capping
> memory usage by setting spark.executor.memory=1G, but to no vail.
>
> I am wondering if there is a way to avoid OOM besides splitting the source
> JSON file into multiple smaller ones and processing the small ones
> individually? Does Spark SQL have to read the JSON/Snappy (row-based) file
> in it's entirety before converting it to ORC (columnar)? If so, would it
> make sense to create a custom receiver that reads the Snappy file and use
> Spark streaming for ORC conversion?
>
> Thanks,
>
> Alec
>



-- 
Regards,
Vaquar Khan
+1 -224-436-0783
Greater Chicago