You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Domen Grabec <do...@celtra.com> on 2014/03/11 16:35:43 UTC

Re: Out of memory on large RDDs

Hi

I have a spark cluster with 4 workers each with 13GB ram. I would like to
process a large data set (does not fit in memory) that consists of JSON
entries. These are the transformations applied:

SparkContext.textFile(s3url). // read files from s3
keyBy(_.parseJson.id) // key by id that is located in json string
groupByKey(number_of_group_tasks) //group by id
flatMap(case (key,lines) => { //do some stuff })

In the web view I can see a key by operation doing a shuffle write. If I
understand correctly the groupByKey transformation creates a wide RDD
dependency thus requiring a shuffle write. I have already increased
spark.akka.askTimeout to 30 seconds and still job fails with errors on
workers:

Error communicating with MapOutputTracker
        at
org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
        at
org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
        at
org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
        at
org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:32)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:71)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
        at org.apache.spark.scheduler.Task.run(Task.scala:53)
        at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
        at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:47)
        at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
        at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[30000] milliseconds
        at akka.dispatch.DefaultPromise.ready(Future.scala:870)
        at akka.dispatch.DefaultPromise.result(Future.scala:874)
        at akka.dispatch.Await$.result(Future.scala:74)
        at
org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
        ... 25 more


Before the error I can see this kind of logs:

14/03/11 14:29:40 INFO MapOutputTracker: Don't have map outputs for shuffle
0, fetching them 14/03/11 14:29:40 INFO MapOutputTracker: Don't have map
outputs for shuffle 0, fetching them 14/03/11 14:29:40 INFO
MapOutputTracker: Don't have map outputs for shuffle 0, fetching them

Can you please help me understand what is going on? Is the whole shuffle
write RDD kept in memory and when cluster runs out of memory it starts
garbage collecting and re fetching from s3?

If this is the case does spark require additional configuration for
effective shuffle write to disk?

Regards, Domen

Re: Out of memory on large RDDs

Posted by Jianshi Huang <ji...@gmail.com>.
I have the same issue (I'm using the latest 1.1.0-SNAPSHOT).

I've increased my driver memory to 30G, executor memory to 10G,
and spark.akka.askTimeout to 180. Still no good. My other configurations
are:

spark.serializer
 org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.mb          256
spark.shuffle.consolidateFiles          true
spark.shuffle.file.buffer.kb            400
spark.akka.frameSize                    500
spark.akka.timeout                      600
spark.akka.askTimeout                   180
spark.core.connection.auth.wait.timeout 300

However I just got informed that the YARN cluster I'm using *throttles the
resource for default queue. *Not sure if it's related.


Jianshi




On Wed, Aug 27, 2014 at 5:15 AM, Andrew Ash <an...@andrewash.com> wrote:

> Hi Grega,
>
> Did you ever get this figured out?  I'm observing the same issue in Spark
> 1.0.2.
>
> For me it was after 1.5hr of a large .distinct call, followed by a
> .saveAsTextFile()
>
>  14/08/26 20:57:43 INFO executor.CoarseGrainedExecutorBackend: Got
> assigned task 18500
>  14/08/26 20:57:43 INFO executor.Executor: Running task ID 18500
>  14/08/26 20:57:43 INFO storage.BlockManager: Found block broadcast_0
> locally
>  14/08/26 20:57:43 INFO spark.MapOutputTrackerWorker: Don't have map
> outputs for shuffle 0, fetching them
>  14/08/26 20:58:13 ERROR executor.Executor: Exception in task ID 18491
>  org.apache.spark.SparkException: Error communicating with MapOutputTracker
>          at
> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:108)
>          at
> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:155)
>          at
> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:42)
>          at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:65)
>          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>          at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>          at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>          at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>          at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>          at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>          at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>          at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>          at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>          at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>          at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>          at org.apache.spark.scheduler.Task.run(Task.scala:51)
>          at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
>          at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>          at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>          at java.lang.Thread.run(Thread.java:745)
>  Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [30 seconds]
>          at
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>          at
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>          at
> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>          at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>          at scala.concurrent.Await$.result(package.scala:107)
>          at
> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:105)
>          ... 23 more
>
>
> On Tue, Mar 11, 2014 at 3:07 PM, Grega Kespret <gr...@celtra.com> wrote:
>
>> > Your input data read as RDD may be causing OOM, so thats where you can
>> use different memory configuration.
>>
>> We are not getting any OOM exceptions, just akka future timeouts in
>> mapoutputtracker and unsuccessful get of shuffle outputs, therefore
>> refetching them.
>>
>> What is the industry practice when going about debugging such errors?
>>
>> Questions:
>> - why are mapoutputtrackers timing out? ( and how to debug this properly?)
>> - what is the task/purpose of mapoutputtracker?
>> - how to check per-task objects size?
>>
>> Thanks,
>> Grega
>>
>> On 11 Mar 2014, at 18:43, Mayur Rustagi <ma...@gmail.com> wrote:
>>
>> Shuffle data is always stored on disk, its unlikely to cause OOM. Your
>> input data read as RDD may be causing OOM, so thats where you can use
>> different memory configuration.
>>
>> Mayur Rustagi
>> Ph: +1 (760) 203 3257
>> http://www.sigmoidanalytics.com
>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>
>>
>>
>> On Tue, Mar 11, 2014 at 9:20 AM, sparrow <do...@celtra.com> wrote:
>>
>>> I don't understand how exactly will that help. There are no persisted
>>> RDD's in storage. Our input data is ~ 100GB, but output of the flatMap is
>>> ~40Mb. The small RDD is then persisted.
>>>
>>> Memory configuration should not affect shuffle data if I understand you
>>> correctly?
>>>
>>>
>>>
>>>
>>> On Tue, Mar 11, 2014 at 4:52 PM, Mayur Rustagi [via Apache Spark User
>>> List] <[hidden email]
>>> <http://user/SendEmail.jtp?type=node&node=2537&i=0>> wrote:
>>>
>>>> Shuffle data is not kept in memory. Did you try additional memory
>>>> configurations(
>>>> https://spark.incubator.apache.org/docs/latest/scala-programming-guide.html#rdd-persistence
>>>> )
>>>>
>>>> Mayur Rustagi
>>>> Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257"
>>>> target="_blank">+1 (760) 203 3257
>>>> http://www.sigmoidanalytics.com
>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>>
>>>>
>>>>
>>>> On Tue, Mar 11, 2014 at 8:35 AM, Domen Grabec <[hidden email]
>>>> <http://user/SendEmail.jtp?type=node&node=2534&i=0>> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> I have a spark cluster with 4 workers each with 13GB ram. I would like
>>>>> to process a large data set (does not fit in memory) that consists of JSON
>>>>> entries. These are the transformations applied:
>>>>>
>>>>> SparkContext.textFile(s3url). // read files from s3
>>>>> keyBy(_.parseJson.id) // key by id that is located in json string
>>>>> groupByKey(number_of_group_tasks) //group by id
>>>>> flatMap(case (key,lines) => { //do some stuff })
>>>>>
>>>>> In the web view I can see a key by operation doing a shuffle write. If
>>>>> I understand correctly the groupByKey transformation creates a wide RDD
>>>>> dependency thus requiring a shuffle write. I have already increased
>>>>> spark.akka.askTimeout to 30 seconds and still job fails with errors
>>>>> on workers:
>>>>>
>>>>> Error communicating with MapOutputTracker
>>>>>         at
>>>>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
>>>>>         at
>>>>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
>>>>>         at
>>>>> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43)
>>>>>         at
>>>>> org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>>>>         at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>>>>         at
>>>>> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:32)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>>>>         at
>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:71)
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
>>>>>         at
>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:53)
>>>>>         at
>>>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
>>>>>         at
>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:47)
>>>>>         at
>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46)
>>>>>         at java.security.AccessController.doPrivileged(Native Method)
>>>>>         at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>>         at
>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>>>>>         at
>>>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
>>>>>         at
>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>         at java.lang.Thread.run(Thread.java:724)
>>>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out
>>>>> after [30000] milliseconds
>>>>>         at akka.dispatch.DefaultPromise.ready(Future.scala:870)
>>>>>         at akka.dispatch.DefaultPromise.result(Future.scala:874)
>>>>>         at akka.dispatch.Await$.result(Future.scala:74)
>>>>>         at
>>>>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
>>>>>         ... 25 more
>>>>>
>>>>>
>>>>> Before the error I can see this kind of logs:
>>>>>
>>>>> 14/03/11 14:29:40 INFO MapOutputTracker: Don't have map outputs for
>>>>> shuffle 0, fetching them 14/03/11 14:29:40 INFO MapOutputTracker: Don't
>>>>> have map outputs for shuffle 0, fetching them 14/03/11 14:29:40 INFO
>>>>> MapOutputTracker: Don't have map outputs for shuffle 0, fetching them
>>>>>
>>>>> Can you please help me understand what is going on? Is the whole
>>>>> shuffle write RDD kept in memory and when cluster runs out of memory it
>>>>> starts garbage collecting and re fetching from s3?
>>>>>
>>>>> If this is the case does spark require additional configuration for
>>>>> effective shuffle write to disk?
>>>>>
>>>>> Regards, Domen
>>>>>
>>>>
>>>>
>>>>
>>>> ------------------------------
>>>>  If you reply to this email, your message will be added to the
>>>> discussion below:
>>>>
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Re-Out-of-memory-on-large-RDDs-tp2533p2534.html
>>>>  To start a new topic under Apache Spark User List, email [hidden
>>>> email] <http://user/SendEmail.jtp?type=node&node=2537&i=1>
>>>> To unsubscribe from Apache Spark User List, click here.
>>>> NAML
>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>
>>>
>>>
>>> ------------------------------
>>> View this message in context: Re: Out of memory on large RDDs
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/Re-Out-of-memory-on-large-RDDs-tp2533p2537.html>
>>> Sent from the Apache Spark User List mailing list archive
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>>
>>
>>
>


-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/

Re: Out of memory on large RDDs

Posted by Andrew Ash <an...@andrewash.com>.
Hi Grega,

Did you ever get this figured out?  I'm observing the same issue in Spark
1.0.2.

For me it was after 1.5hr of a large .distinct call, followed by a
.saveAsTextFile()

 14/08/26 20:57:43 INFO executor.CoarseGrainedExecutorBackend: Got assigned
task 18500
 14/08/26 20:57:43 INFO executor.Executor: Running task ID 18500
 14/08/26 20:57:43 INFO storage.BlockManager: Found block broadcast_0
locally
 14/08/26 20:57:43 INFO spark.MapOutputTrackerWorker: Don't have map
outputs for shuffle 0, fetching them
 14/08/26 20:58:13 ERROR executor.Executor: Exception in task ID 18491
 org.apache.spark.SparkException: Error communicating with MapOutputTracker
         at
org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:108)
         at
org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:155)
         at
org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:42)
         at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:65)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
         at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
         at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
         at org.apache.spark.scheduler.Task.run(Task.scala:51)
         at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
         at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
         at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
         at java.lang.Thread.run(Thread.java:745)
 Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[30 seconds]
         at
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
         at
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
         at
scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
         at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
         at scala.concurrent.Await$.result(package.scala:107)
         at
org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:105)
         ... 23 more


On Tue, Mar 11, 2014 at 3:07 PM, Grega Kespret <gr...@celtra.com> wrote:

> > Your input data read as RDD may be causing OOM, so thats where you can
> use different memory configuration.
>
> We are not getting any OOM exceptions, just akka future timeouts in
> mapoutputtracker and unsuccessful get of shuffle outputs, therefore
> refetching them.
>
> What is the industry practice when going about debugging such errors?
>
> Questions:
> - why are mapoutputtrackers timing out? ( and how to debug this properly?)
> - what is the task/purpose of mapoutputtracker?
> - how to check per-task objects size?
>
> Thanks,
> Grega
>
> On 11 Mar 2014, at 18:43, Mayur Rustagi <ma...@gmail.com> wrote:
>
> Shuffle data is always stored on disk, its unlikely to cause OOM. Your
> input data read as RDD may be causing OOM, so thats where you can use
> different memory configuration.
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>
>
>
> On Tue, Mar 11, 2014 at 9:20 AM, sparrow <do...@celtra.com> wrote:
>
>> I don't understand how exactly will that help. There are no persisted
>> RDD's in storage. Our input data is ~ 100GB, but output of the flatMap is
>> ~40Mb. The small RDD is then persisted.
>>
>> Memory configuration should not affect shuffle data if I understand you
>> correctly?
>>
>>
>>
>>
>> On Tue, Mar 11, 2014 at 4:52 PM, Mayur Rustagi [via Apache Spark User
>> List] <[hidden email] <http://user/SendEmail.jtp?type=node&node=2537&i=0>
>> > wrote:
>>
>>> Shuffle data is not kept in memory. Did you try additional memory
>>> configurations(
>>> https://spark.incubator.apache.org/docs/latest/scala-programming-guide.html#rdd-persistence
>>> )
>>>
>>> Mayur Rustagi
>>> Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257"
>>> target="_blank">+1 (760) 203 3257
>>> http://www.sigmoidanalytics.com
>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>
>>>
>>>
>>> On Tue, Mar 11, 2014 at 8:35 AM, Domen Grabec <[hidden email]
>>> <http://user/SendEmail.jtp?type=node&node=2534&i=0>> wrote:
>>>
>>>> Hi
>>>>
>>>> I have a spark cluster with 4 workers each with 13GB ram. I would like
>>>> to process a large data set (does not fit in memory) that consists of JSON
>>>> entries. These are the transformations applied:
>>>>
>>>> SparkContext.textFile(s3url). // read files from s3
>>>> keyBy(_.parseJson.id) // key by id that is located in json string
>>>> groupByKey(number_of_group_tasks) //group by id
>>>> flatMap(case (key,lines) => { //do some stuff })
>>>>
>>>> In the web view I can see a key by operation doing a shuffle write. If
>>>> I understand correctly the groupByKey transformation creates a wide RDD
>>>> dependency thus requiring a shuffle write. I have already increased
>>>> spark.akka.askTimeout to 30 seconds and still job fails with errors on
>>>> workers:
>>>>
>>>> Error communicating with MapOutputTracker
>>>>         at
>>>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
>>>>         at
>>>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
>>>>         at
>>>> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43)
>>>>         at
>>>> org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>>>         at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>>>         at
>>>> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:32)
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>>>         at
>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:71)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
>>>>         at
>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:53)
>>>>         at
>>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
>>>>         at
>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:47)
>>>>         at
>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46)
>>>>         at java.security.AccessController.doPrivileged(Native Method)
>>>>         at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>         at
>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>>>>         at
>>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
>>>>         at
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>         at java.lang.Thread.run(Thread.java:724)
>>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out
>>>> after [30000] milliseconds
>>>>         at akka.dispatch.DefaultPromise.ready(Future.scala:870)
>>>>         at akka.dispatch.DefaultPromise.result(Future.scala:874)
>>>>         at akka.dispatch.Await$.result(Future.scala:74)
>>>>         at
>>>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
>>>>         ... 25 more
>>>>
>>>>
>>>> Before the error I can see this kind of logs:
>>>>
>>>> 14/03/11 14:29:40 INFO MapOutputTracker: Don't have map outputs for
>>>> shuffle 0, fetching them 14/03/11 14:29:40 INFO MapOutputTracker: Don't
>>>> have map outputs for shuffle 0, fetching them 14/03/11 14:29:40 INFO
>>>> MapOutputTracker: Don't have map outputs for shuffle 0, fetching them
>>>>
>>>> Can you please help me understand what is going on? Is the whole
>>>> shuffle write RDD kept in memory and when cluster runs out of memory it
>>>> starts garbage collecting and re fetching from s3?
>>>>
>>>> If this is the case does spark require additional configuration for
>>>> effective shuffle write to disk?
>>>>
>>>> Regards, Domen
>>>>
>>>
>>>
>>>
>>> ------------------------------
>>>  If you reply to this email, your message will be added to the
>>> discussion below:
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Re-Out-of-memory-on-large-RDDs-tp2533p2534.html
>>>  To start a new topic under Apache Spark User List, email [hidden email]
>>> <http://user/SendEmail.jtp?type=node&node=2537&i=1>
>>> To unsubscribe from Apache Spark User List, click here.
>>> NAML
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>
>>
>>
>> ------------------------------
>> View this message in context: Re: Out of memory on large RDDs
>> <http://apache-spark-user-list.1001560.n3.nabble.com/Re-Out-of-memory-on-large-RDDs-tp2533p2537.html>
>> Sent from the Apache Spark User List mailing list archive
>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>
>
>

Re: Out of memory on large RDDs

Posted by Grega Kespret <gr...@celtra.com>.
> Your input data read as RDD may be causing OOM, so thats where you can use different memory configuration. 

We are not getting any OOM exceptions, just akka future timeouts in mapoutputtracker and unsuccessful get of shuffle outputs, therefore refetching them. 

What is the industry practice when going about debugging such errors? 

Questions:
- why are mapoutputtrackers timing out? ( and how to debug this properly?)
- what is the task/purpose of mapoutputtracker?
- how to check per-task objects size?

Thanks,
Grega

> On 11 Mar 2014, at 18:43, Mayur Rustagi <ma...@gmail.com> wrote:
> 
> Shuffle data is always stored on disk, its unlikely to cause OOM. Your input data read as RDD may be causing OOM, so thats where you can use different memory configuration. 
> 
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi
> 
> 
> 
>> On Tue, Mar 11, 2014 at 9:20 AM, sparrow <do...@celtra.com> wrote:
>> I don't understand how exactly will that help. There are no persisted RDD's in storage. Our input data is ~ 100GB, but output of the flatMap is ~40Mb. The small RDD is then persisted. 
>> 
>> Memory configuration should not affect shuffle data if I understand you correctly?
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>>> On Tue, Mar 11, 2014 at 4:52 PM, Mayur Rustagi [via Apache Spark User List] <[hidden email]> wrote:
>>> Shuffle data is not kept in memory. Did you try additional memory configurations( https://spark.incubator.apache.org/docs/latest/scala-programming-guide.html#rdd-persistence) 
>>> 
>>> Mayur Rustagi
>>> Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257" target="_blank">+1 (760) 203 3257
>>> http://www.sigmoidanalytics.com
>>> @mayur_rustagi
>>> 
>>> 
>>> 
>>>> On Tue, Mar 11, 2014 at 8:35 AM, Domen Grabec <[hidden email]> wrote:
>>>> Hi
>>>> 
>>>> I have a spark cluster with 4 workers each with 13GB ram. I would like to process a large data set (does not fit in memory) that consists of JSON entries. These are the transformations applied:
>>>> 
>>>> SparkContext.textFile(s3url). // read files from s3
>>>> keyBy(_.parseJson.id) // key by id that is located in json string
>>>> groupByKey(number_of_group_tasks) //group by id
>>>> flatMap(case (key,lines) => { //do some stuff })
>>>> 
>>>> In the web view I can see a key by operation doing a shuffle write. If I understand correctly the groupByKey transformation creates a wide RDD dependency thus requiring a shuffle write. I have already increased spark.akka.askTimeout to 30 seconds and still job fails with errors on workers:
>>>> 
>>>> Error communicating with MapOutputTracker
>>>>         at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
>>>>         at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
>>>>         at org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43)
>>>>         at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
>>>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>>>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
>>>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>>>         at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:32)
>>>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>>>         at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:71)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
>>>>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:53)
>>>>         at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
>>>>         at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:47)
>>>>         at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46)
>>>>         at java.security.AccessController.doPrivileged(Native Method)
>>>>         at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>         at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>>>>         at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
>>>>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
>>>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>         at java.lang.Thread.run(Thread.java:724)
>>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30000] milliseconds
>>>>         at akka.dispatch.DefaultPromise.ready(Future.scala:870)
>>>>         at akka.dispatch.DefaultPromise.result(Future.scala:874)
>>>>         at akka.dispatch.Await$.result(Future.scala:74)
>>>>         at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
>>>>         ... 25 more
>>>> 
>>>> 
>>>> Before the error I can see this kind of logs:
>>>> 
>>>> 14/03/11 14:29:40 INFO MapOutputTracker: Don't have map outputs for shuffle 0, fetching them
>>>> 14/03/11 14:29:40 INFO MapOutputTracker: Don't have map outputs for shuffle 0, fetching them
>>>> 14/03/11 14:29:40 INFO MapOutputTracker: Don't have map outputs for shuffle 0, fetching them
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> Can you please help me understand what is going on? Is the whole shuffle write RDD kept in memory and when cluster runs out of memory it starts garbage collecting and re fetching from s3? 
>>>> 
>>>> If this is the case does spark require additional configuration for effective shuffle write to disk?
>>>> 
>>>> Regards, Domen
>>> 
>>> 
>>> 
>>> If you reply to this email, your message will be added to the discussion below:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Re-Out-of-memory-on-large-RDDs-tp2533p2534.html
>>> To start a new topic under Apache Spark User List, email [hidden email] 
>>> To unsubscribe from Apache Spark User List, click here.
>>> NAML
>> 
>> 
>> View this message in context: Re: Out of memory on large RDDs
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 

Re: Out of memory on large RDDs

Posted by Mayur Rustagi <ma...@gmail.com>.
Shuffle data is always stored on disk, its unlikely to cause OOM. Your
input data read as RDD may be causing OOM, so thats where you can use
different memory configuration.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>



On Tue, Mar 11, 2014 at 9:20 AM, sparrow <do...@celtra.com> wrote:

> I don't understand how exactly will that help. There are no persisted
> RDD's in storage. Our input data is ~ 100GB, but output of the flatMap is
> ~40Mb. The small RDD is then persisted.
>
> Memory configuration should not affect shuffle data if I understand you
> correctly?
>
>
>
>
> On Tue, Mar 11, 2014 at 4:52 PM, Mayur Rustagi [via Apache Spark User
> List] <[hidden email] <http://user/SendEmail.jtp?type=node&node=2537&i=0>>wrote:
>
>> Shuffle data is not kept in memory. Did you try additional memory
>> configurations(
>> https://spark.incubator.apache.org/docs/latest/scala-programming-guide.html#rdd-persistence
>> )
>>
>> Mayur Rustagi
>> Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257"
>> target="_blank">+1 (760) 203 3257
>> http://www.sigmoidanalytics.com
>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>
>>
>>
>> On Tue, Mar 11, 2014 at 8:35 AM, Domen Grabec <[hidden email]<http://user/SendEmail.jtp?type=node&node=2534&i=0>
>> > wrote:
>>
>>> Hi
>>>
>>> I have a spark cluster with 4 workers each with 13GB ram. I would like
>>> to process a large data set (does not fit in memory) that consists of JSON
>>> entries. These are the transformations applied:
>>>
>>> SparkContext.textFile(s3url). // read files from s3
>>> keyBy(_.parseJson.id) // key by id that is located in json string
>>> groupByKey(number_of_group_tasks) //group by id
>>> flatMap(case (key,lines) => { //do some stuff })
>>>
>>> In the web view I can see a key by operation doing a shuffle write. If I
>>> understand correctly the groupByKey transformation creates a wide RDD
>>> dependency thus requiring a shuffle write. I have already increased
>>> spark.akka.askTimeout to 30 seconds and still job fails with errors on
>>> workers:
>>>
>>> Error communicating with MapOutputTracker
>>>         at
>>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
>>>         at
>>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
>>>         at
>>> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43)
>>>         at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>>         at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>>         at
>>> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:32)
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>>         at
>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:71)
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
>>>         at
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
>>>         at org.apache.spark.scheduler.Task.run(Task.scala:53)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
>>>         at
>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:47)
>>>         at
>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46)
>>>         at java.security.AccessController.doPrivileged(Native Method)
>>>         at javax.security.auth.Subject.doAs(Subject.java:415)
>>>         at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>>>         at
>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>         at java.lang.Thread.run(Thread.java:724)
>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out
>>> after [30000] milliseconds
>>>         at akka.dispatch.DefaultPromise.ready(Future.scala:870)
>>>         at akka.dispatch.DefaultPromise.result(Future.scala:874)
>>>         at akka.dispatch.Await$.result(Future.scala:74)
>>>         at
>>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
>>>         ... 25 more
>>>
>>>
>>> Before the error I can see this kind of logs:
>>>
>>> 14/03/11 14:29:40 INFO MapOutputTracker: Don't have map outputs for
>>> shuffle 0, fetching them 14/03/11 14:29:40 INFO MapOutputTracker: Don't
>>> have map outputs for shuffle 0, fetching them 14/03/11 14:29:40 INFO
>>> MapOutputTracker: Don't have map outputs for shuffle 0, fetching them
>>>
>>> Can you please help me understand what is going on? Is the whole shuffle
>>> write RDD kept in memory and when cluster runs out of memory it starts
>>> garbage collecting and re fetching from s3?
>>>
>>> If this is the case does spark require additional configuration for
>>> effective shuffle write to disk?
>>>
>>> Regards, Domen
>>>
>>
>>
>>
>> ------------------------------
>>  If you reply to this email, your message will be added to the
>> discussion below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Re-Out-of-memory-on-large-RDDs-tp2533p2534.html
>>  To start a new topic under Apache Spark User List, email [hidden email]<http://user/SendEmail.jtp?type=node&node=2537&i=1>
>> To unsubscribe from Apache Spark User List, click here.
>> NAML<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
> ------------------------------
> View this message in context: Re: Out of memory on large RDDs<http://apache-spark-user-list.1001560.n3.nabble.com/Re-Out-of-memory-on-large-RDDs-tp2533p2537.html>
> Sent from the Apache Spark User List mailing list archive<http://apache-spark-user-list.1001560.n3.nabble.com/>at Nabble.com.
>

Re: Out of memory on large RDDs

Posted by sparrow <do...@celtra.com>.
I don't understand how exactly will that help. There are no persisted RDD's
in storage. Our input data is ~ 100GB, but output of the flatMap is ~40Mb.
The small RDD is then persisted.

Memory configuration should not affect shuffle data if I understand you
correctly?




On Tue, Mar 11, 2014 at 4:52 PM, Mayur Rustagi [via Apache Spark User List]
<ml...@n3.nabble.com> wrote:

> Shuffle data is not kept in memory. Did you try additional memory
> configurations(
> https://spark.incubator.apache.org/docs/latest/scala-programming-guide.html#rdd-persistence
> )
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>
>
>
> On Tue, Mar 11, 2014 at 8:35 AM, Domen Grabec <[hidden email]<http://user/SendEmail.jtp?type=node&node=2534&i=0>
> > wrote:
>
>> Hi
>>
>> I have a spark cluster with 4 workers each with 13GB ram. I would like to
>> process a large data set (does not fit in memory) that consists of JSON
>> entries. These are the transformations applied:
>>
>> SparkContext.textFile(s3url). // read files from s3
>> keyBy(_.parseJson.id) // key by id that is located in json string
>> groupByKey(number_of_group_tasks) //group by id
>> flatMap(case (key,lines) => { //do some stuff })
>>
>> In the web view I can see a key by operation doing a shuffle write. If I
>> understand correctly the groupByKey transformation creates a wide RDD
>> dependency thus requiring a shuffle write. I have already increased
>> spark.akka.askTimeout to 30 seconds and still job fails with errors on
>> workers:
>>
>> Error communicating with MapOutputTracker
>>         at
>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
>>         at
>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
>>         at
>> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43)
>>         at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>         at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>         at
>> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:32)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>         at
>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:71)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
>>         at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:53)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
>>         at
>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:47)
>>         at
>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46)
>>         at java.security.AccessController.doPrivileged(Native Method)
>>         at javax.security.auth.Subject.doAs(Subject.java:415)
>>         at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>>         at
>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:724)
>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
>> [30000] milliseconds
>>         at akka.dispatch.DefaultPromise.ready(Future.scala:870)
>>         at akka.dispatch.DefaultPromise.result(Future.scala:874)
>>         at akka.dispatch.Await$.result(Future.scala:74)
>>         at
>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
>>         ... 25 more
>>
>>
>> Before the error I can see this kind of logs:
>>
>> 14/03/11 14:29:40 INFO MapOutputTracker: Don't have map outputs for
>> shuffle 0, fetching them 14/03/11 14:29:40 INFO MapOutputTracker: Don't
>> have map outputs for shuffle 0, fetching them 14/03/11 14:29:40 INFO
>> MapOutputTracker: Don't have map outputs for shuffle 0, fetching them
>>
>> Can you please help me understand what is going on? Is the whole shuffle
>> write RDD kept in memory and when cluster runs out of memory it starts
>> garbage collecting and re fetching from s3?
>>
>> If this is the case does spark require additional configuration for
>> effective shuffle write to disk?
>>
>> Regards, Domen
>>
>
>
>
> ------------------------------
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Re-Out-of-memory-on-large-RDDs-tp2533p2534.html
>  To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1h70@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=ZG9tZW5AY2VsdHJhLmNvbXwxfC01NjUwMzk2ODU=>
> .
> NAML<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-Out-of-memory-on-large-RDDs-tp2533p2537.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Out of memory on large RDDs

Posted by Mayur Rustagi <ma...@gmail.com>.
Shuffle data is not kept in memory. Did you try additional memory
configurations(
https://spark.incubator.apache.org/docs/latest/scala-programming-guide.html#rdd-persistence
)

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>



On Tue, Mar 11, 2014 at 8:35 AM, Domen Grabec <do...@celtra.com> wrote:

> Hi
>
> I have a spark cluster with 4 workers each with 13GB ram. I would like to
> process a large data set (does not fit in memory) that consists of JSON
> entries. These are the transformations applied:
>
> SparkContext.textFile(s3url). // read files from s3
> keyBy(_.parseJson.id) // key by id that is located in json string
> groupByKey(number_of_group_tasks) //group by id
> flatMap(case (key,lines) => { //do some stuff })
>
> In the web view I can see a key by operation doing a shuffle write. If I
> understand correctly the groupByKey transformation creates a wide RDD
> dependency thus requiring a shuffle write. I have already increased
> spark.akka.askTimeout to 30 seconds and still job fails with errors on
> workers:
>
> Error communicating with MapOutputTracker
>         at
> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
>         at
> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
>         at
> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43)
>         at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>         at
> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:32)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>         at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:71)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
>         at org.apache.spark.scheduler.Task.run(Task.scala:53)
>         at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
>         at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:47)
>         at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:415)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>         at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:724)
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [30000] milliseconds
>         at akka.dispatch.DefaultPromise.ready(Future.scala:870)
>         at akka.dispatch.DefaultPromise.result(Future.scala:874)
>         at akka.dispatch.Await$.result(Future.scala:74)
>         at
> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
>         ... 25 more
>
>
> Before the error I can see this kind of logs:
>
> 14/03/11 14:29:40 INFO MapOutputTracker: Don't have map outputs for
> shuffle 0, fetching them 14/03/11 14:29:40 INFO MapOutputTracker: Don't
> have map outputs for shuffle 0, fetching them 14/03/11 14:29:40 INFO
> MapOutputTracker: Don't have map outputs for shuffle 0, fetching them
>
> Can you please help me understand what is going on? Is the whole shuffle
> write RDD kept in memory and when cluster runs out of memory it starts
> garbage collecting and re fetching from s3?
>
> If this is the case does spark require additional configuration for
> effective shuffle write to disk?
>
> Regards, Domen
>