You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Vipul Pandey <vi...@gmail.com> on 2014/01/08 08:13:31 UTC

newbie : java.lang.OutOfMemoryError: Java heap space

Hi, 

Disclaimer : Newbie (well, a returning user)

Setup : 
20 nodes
-Dspark.executor.memory=40g  , essentially tons of space for my usecase


Pretty straight forward join between two inputs 
- 17G (distributed in 10 equally sized - 1.7g files)
- 49Mb (1 file)
I just need to join based on the keys and write out values from both as tuples
==================================================================

    val XA = sc.textFile("path-to-17GB") //assume this to be a tab separated key value pair
    val XAMap = XA.map(x => {
      val arr = x.split("\t")
      (arr(0),arr(1))
    })


    val XY = sc.textFile("pathTo49MB")  // assume it to be a tab separated key value pair
    val XYMap = XY.map(x => {
     val arr  = x.split("\t")
      (arr(0), arr(1))
    }).collect.toMap

    val bc = sc.broadcast(XYMap)

    val joined =  XAMap.map(v => {
      (bc.value(v._1),v._2)
    })

    joined.saveAsTextFile("path-to-output")
==================================================================

When i try to save the text file it throws a OOME and my shell quits. 
any clues?

scala>     joined.saveAsTextFile("path-to-output")
Uncaught error from thread [spark-akka.actor.default-dispatcher-4] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[spark]
java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:2786)
	at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:94)
	at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1823)
	at java.io.ObjectOutputStream.write(ObjectOutputStream.java:670)
	at org.apache.spark.scheduler.ResultTask.writeExternal(ResultTask.scala:116)
	at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1429)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1398)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:27)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:47)
	at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:78)
	at org.apache.spark.scheduler.cluster.ClusterTaskSetManager.resourceOffer(ClusterTaskSetManager.scala:375)
	at org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3$$anonfun$apply$3$$anonfun$apply$1.apply$mcVI$sp(ClusterScheduler.scala:215)
	at org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3$$anonfun$apply$3.apply(ClusterScheduler.scala:212)
	at org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3$$anonfun$apply$3.apply(ClusterScheduler.scala:209)
	at scala.collection.Iterator$class.foreach(Iterator.scala:772)
	at scala.collection.Iterator$$anon$19.foreach(Iterator.scala:399)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
	at scala.Enumeration$ValueSet.foreach(Enumeration.scala:234)
	at org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3.apply(ClusterScheduler.scala:209)
	at org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3.apply(ClusterScheduler.scala:209)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.cluster.ClusterScheduler.resourceOffers(ClusterScheduler.scala:209)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor.makeOffers(StandaloneSchedulerBackend.scala:113)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor$$anonfun$receive$1.apply(StandaloneSchedulerBackend.scala:91)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor$$anonfun$receive$1.apply(StandaloneSchedulerBackend.scala:64)
	at akka.actor.Actor$class.apply(Actor.scala:318)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor.apply(StandaloneSchedulerBackend.scala:47)
	at akka.actor.ActorCell.invoke(ActorCell.scala:626)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:197)


Re: newbie : java.lang.OutOfMemoryError: Java heap space

Posted by Ewen Cheslack-Postava <me...@ewencp.org>.
 From the stack trace, it looks like the driver program is dying trying 
to serialize data out to the workers. My guess is that whatever machine 
you're running from has a relatively small default maximum heap size and 
trying to broadcast the 49MB file is causing it to run out of memory. I 
don't know if Spark avoids reserializing broadcast data for each worker 
or not, but if it didn't, that would make the explanation even more 
plausible as you might have many copies of it in memory at once.

Are you using the java -Xmx flag to set the JVM's max heap space? It 
seems that you're using spark-shell, for which I think you can still use 
the SPARK_MEM environment variable.

(You could also do the join() through Spark instead of doing it manually 
by broadcasting half the data. This would keep the "large" 49MB data set 
out of the driver and probably avoid any memory issues there. But it 
looks like your approach could be better if you want to avoid shuffling 
the 17GB data set and this is the entirety of your analysis task.)

-Ewen

> Vipul Pandey <ma...@gmail.com>
> January 8, 2014 3:54 PM
> Any idea anyone?  This seems like a pretty basic requirement and i'm 
> sure a minor config change might get it to work.
> I'd appreciate any pointers as i'm blocked on this since last night.
>
> btw, spark version is 0.8.0
>
>
>
>
> Prashant Sharma <ma...@gmail.com>
> January 7, 2014 11:31 PM
> spark version ?
>
>
>
>
>
> -- 
> Prashant
> Vipul Pandey <ma...@gmail.com>
> January 7, 2014 11:13 PM
> Hi,
>
> Disclaimer : Newbie (well, a returning user)
>
> Setup :
> 20 nodes
> -Dspark.executor.memory=40g  , essentially tons of space for my usecase
>
>
> Pretty straight forward join between two inputs
> - 17G (distributed in 10 equally sized - 1.7g files)
> - 49Mb (1 file)
> I just need to join based on the keys and write out values from both 
> as tuples
> ==================================================================
>
>     val XA = sc.textFile("path-to-17GB") //assume this to be a tab 
> separated key value pair
>     val XAMap = XA.map(x => {
>       val arr = x.split("\t")
>       (arr(0),arr(1))
>     })
>
>
>     val XY = sc.textFile("pathTo49MB")  // assume it to be a tab 
> separated key value pair
>     val XYMap = XY.map(x => {
>      val arr  = x.split("\t")
>       (arr(0), arr(1))
>     }).collect.toMap
>
>     val bc = sc.broadcast(XYMap)
>
>     val joined =  XAMap.map(v => {
>       (bc.value(v._1),v._2)
>     })
>
>     joined.saveAsTextFile("path-to-output")
> ==================================================================
>
> When i try to save the text file it throws a OOME and my shell quits.
> any clues?
>
> scala>     joined.saveAsTextFile("path-to-output")
> Uncaught error from thread [spark-akka.actor.default-dispatcher-4] 
> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for 
> ActorSystem[spark]
> java.lang.OutOfMemoryError: Java heap space
> at java.util.Arrays.copyOf(Arrays.java:2786)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:94)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1823)
> at java.io.ObjectOutputStream.write(ObjectOutputStream.java:670)
> at 
> org.apache.spark.scheduler.ResultTask.writeExternal(ResultTask.scala:116)
> at 
> java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1429)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1398)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
> at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:27)
> at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:47)
> at 
> org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:78)
> at 
> org.apache.spark.scheduler.cluster.ClusterTaskSetManager.resourceOffer(ClusterTaskSetManager.scala:375)
> at 
> org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3$$anonfun$apply$3$$anonfun$apply$1.apply$mcVI$sp(ClusterScheduler.scala:215)
> at 
> org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3$$anonfun$apply$3.apply(ClusterScheduler.scala:212)
> at 
> org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3$$anonfun$apply$3.apply(ClusterScheduler.scala:209)
> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> at scala.collection.Iterator$$anon$19.foreach(Iterator.scala:399)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
> at scala.Enumeration$ValueSet.foreach(Enumeration.scala:234)
> at 
> org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3.apply(ClusterScheduler.scala:209)
> at 
> org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3.apply(ClusterScheduler.scala:209)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at 
> org.apache.spark.scheduler.cluster.ClusterScheduler.resourceOffers(ClusterScheduler.scala:209)
> at 
> org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor.makeOffers(StandaloneSchedulerBackend.scala:113)
> at 
> org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor$$anonfun$receive$1.apply(StandaloneSchedulerBackend.scala:91)
> at 
> org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor$$anonfun$receive$1.apply(StandaloneSchedulerBackend.scala:64)
> at akka.actor.Actor$class.apply(Actor.scala:318)
> at 
> org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor.apply(StandaloneSchedulerBackend.scala:47)
> at akka.actor.ActorCell.invoke(ActorCell.scala:626)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:197)
>

Re: newbie : java.lang.OutOfMemoryError: Java heap space

Posted by Vipul Pandey <vi...@gmail.com>.
Any idea anyone?  This seems like a pretty basic requirement and i'm sure a minor config change might get it to work.
I'd appreciate any pointers as i'm blocked on this since last night. 

btw, spark version is 0.8.0 


On Jan 7, 2014, at 11:31 PM, Prashant Sharma <sc...@gmail.com> wrote:

> spark version ?
> 
> 
> On Wed, Jan 8, 2014 at 12:43 PM, Vipul Pandey <vi...@gmail.com> wrote:
> Hi, 
> 
> Disclaimer : Newbie (well, a returning user)
> 
> Setup : 
> 20 nodes
> -Dspark.executor.memory=40g  , essentially tons of space for my usecase
> 
> 
> Pretty straight forward join between two inputs 
> - 17G (distributed in 10 equally sized - 1.7g files)
> - 49Mb (1 file)
> I just need to join based on the keys and write out values from both as tuples
> ==================================================================
> 
>     val XA = sc.textFile("path-to-17GB") //assume this to be a tab separated key value pair
>     val XAMap = XA.map(x => {
>       val arr = x.split("\t")
>       (arr(0),arr(1))
>     })
> 
> 
>     val XY = sc.textFile("pathTo49MB")  // assume it to be a tab separated key value pair
>     val XYMap = XY.map(x => {
>      val arr  = x.split("\t")
>       (arr(0), arr(1))
>     }).collect.toMap
> 
>     val bc = sc.broadcast(XYMap)
> 
>     val joined =  XAMap.map(v => {
>       (bc.value(v._1),v._2)
>     })
> 
>     joined.saveAsTextFile("path-to-output")
> ==================================================================
> 
> When i try to save the text file it throws a OOME and my shell quits. 
> any clues?
> 
> scala>     joined.saveAsTextFile("path-to-output")
> Uncaught error from thread [spark-akka.actor.default-dispatcher-4] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[spark]
> java.lang.OutOfMemoryError: Java heap space
> 	at java.util.Arrays.copyOf(Arrays.java:2786)
> 	at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:94)
> 	at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1823)
> 	at java.io.ObjectOutputStream.write(ObjectOutputStream.java:670)
> 	at org.apache.spark.scheduler.ResultTask.writeExternal(ResultTask.scala:116)
> 	at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1429)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1398)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
> 	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
> 	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:27)
> 	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:47)
> 	at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:78)
> 	at org.apache.spark.scheduler.cluster.ClusterTaskSetManager.resourceOffer(ClusterTaskSetManager.scala:375)
> 	at org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3$$anonfun$apply$3$$anonfun$apply$1.apply$mcVI$sp(ClusterScheduler.scala:215)
> 	at org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3$$anonfun$apply$3.apply(ClusterScheduler.scala:212)
> 	at org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3$$anonfun$apply$3.apply(ClusterScheduler.scala:209)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> 	at scala.collection.Iterator$$anon$19.foreach(Iterator.scala:399)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
> 	at scala.Enumeration$ValueSet.foreach(Enumeration.scala:234)
> 	at org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3.apply(ClusterScheduler.scala:209)
> 	at org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3.apply(ClusterScheduler.scala:209)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> 	at org.apache.spark.scheduler.cluster.ClusterScheduler.resourceOffers(ClusterScheduler.scala:209)
> 	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor.makeOffers(StandaloneSchedulerBackend.scala:113)
> 	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor$$anonfun$receive$1.apply(StandaloneSchedulerBackend.scala:91)
> 	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor$$anonfun$receive$1.apply(StandaloneSchedulerBackend.scala:64)
> 	at akka.actor.Actor$class.apply(Actor.scala:318)
> 	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor.apply(StandaloneSchedulerBackend.scala:47)
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:626)
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:197)
> 
> 
> 
> 
> -- 
> Prashant


Re: newbie : java.lang.OutOfMemoryError: Java heap space

Posted by Vipul Pandey <vi...@gmail.com>.
0.8.0

Sent from my iPhone

> On Jan 7, 2014, at 11:31 PM, Prashant Sharma <sc...@gmail.com> wrote:
> 
> spark version ?
> 
> 
>> On Wed, Jan 8, 2014 at 12:43 PM, Vipul Pandey <vi...@gmail.com> wrote:
>> Hi, 
>> 
>> Disclaimer : Newbie (well, a returning user)
>> 
>> Setup : 
>> 20 nodes
>> -Dspark.executor.memory=40g  , essentially tons of space for my usecase
>> 
>> 
>> Pretty straight forward join between two inputs 
>> - 17G (distributed in 10 equally sized - 1.7g files)
>> - 49Mb (1 file)
>> I just need to join based on the keys and write out values from both as tuples
>> ==================================================================
>> 
>>     val XA = sc.textFile("path-to-17GB") //assume this to be a tab separated key value pair
>>     val XAMap = XA.map(x => {
>>       val arr = x.split("\t")
>>       (arr(0),arr(1))
>>     })
>> 
>> 
>>     val XY = sc.textFile("pathTo49MB")  // assume it to be a tab separated key value pair
>>     val XYMap = XY.map(x => {
>>      val arr  = x.split("\t")
>>       (arr(0), arr(1))
>>     }).collect.toMap
>> 
>>     val bc = sc.broadcast(XYMap)
>> 
>>     val joined =  XAMap.map(v => {
>>       (bc.value(v._1),v._2)
>>     })
>> 
>>     joined.saveAsTextFile("path-to-output")
>> ==================================================================
>> 
>> When i try to save the text file it throws a OOME and my shell quits. 
>> any clues?
>> 
>> scala>     joined.saveAsTextFile("path-to-output")
>> Uncaught error from thread [spark-akka.actor.default-dispatcher-4] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[spark]
>> java.lang.OutOfMemoryError: Java heap space
>> 	at java.util.Arrays.copyOf(Arrays.java:2786)
>> 	at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:94)
>> 	at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1823)
>> 	at java.io.ObjectOutputStream.write(ObjectOutputStream.java:670)
>> 	at org.apache.spark.scheduler.ResultTask.writeExternal(ResultTask.scala:116)
>> 	at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1429)
>> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1398)
>> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
>> 	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
>> 	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:27)
>> 	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:47)
>> 	at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:78)
>> 	at org.apache.spark.scheduler.cluster.ClusterTaskSetManager.resourceOffer(ClusterTaskSetManager.scala:375)
>> 	at org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3$$anonfun$apply$3$$anonfun$apply$1.apply$mcVI$sp(ClusterScheduler.scala:215)
>> 	at org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3$$anonfun$apply$3.apply(ClusterScheduler.scala:212)
>> 	at org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3$$anonfun$apply$3.apply(ClusterScheduler.scala:209)
>> 	at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>> 	at scala.collection.Iterator$$anon$19.foreach(Iterator.scala:399)
>> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>> 	at scala.Enumeration$ValueSet.foreach(Enumeration.scala:234)
>> 	at org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3.apply(ClusterScheduler.scala:209)
>> 	at org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3.apply(ClusterScheduler.scala:209)
>> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> 	at org.apache.spark.scheduler.cluster.ClusterScheduler.resourceOffers(ClusterScheduler.scala:209)
>> 	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor.makeOffers(StandaloneSchedulerBackend.scala:113)
>> 	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor$$anonfun$receive$1.apply(StandaloneSchedulerBackend.scala:91)
>> 	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor$$anonfun$receive$1.apply(StandaloneSchedulerBackend.scala:64)
>> 	at akka.actor.Actor$class.apply(Actor.scala:318)
>> 	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor.apply(StandaloneSchedulerBackend.scala:47)
>> 	at akka.actor.ActorCell.invoke(ActorCell.scala:626)
>> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:197)
> 
> 
> 
> -- 
> Prashant

Re: newbie : java.lang.OutOfMemoryError: Java heap space

Posted by Prashant Sharma <sc...@gmail.com>.
spark version ?


On Wed, Jan 8, 2014 at 12:43 PM, Vipul Pandey <vi...@gmail.com> wrote:

> Hi,
>
> Disclaimer : Newbie (well, a returning user)
>
> Setup :
> 20 nodes
> -Dspark.executor.memory=40g  , essentially tons of space for my usecase
>
>
> Pretty straight forward join between two inputs
> - 17G (distributed in 10 equally sized - 1.7g files)
> - 49Mb (1 file)
> I just need to join based on the keys and write out values from both as
> tuples
> ==================================================================
>
>     val XA = sc.textFile("path-to-17GB") //assume this to be a tab
> separated key value pair
>     val XAMap = XA.map(x => {
>       val arr = x.split("\t")
>       (arr(0),arr(1))
>     })
>
>
>     val XY = sc.textFile("pathTo49MB")  // assume it to be a tab separated
> key value pair
>     val XYMap = XY.map(x => {
>      val arr  = x.split("\t")
>       (arr(0), arr(1))
>     }).collect.toMap
>
>     val bc = sc.broadcast(XYMap)
>
>     val joined =  XAMap.map(v => {
>       (bc.value(v._1),v._2)
>     })
>
>     joined.saveAsTextFile("path-to-output")
> ==================================================================
>
> When i try to save the text file it throws a OOME and my shell quits.
> any clues?
>
> scala>     joined.saveAsTextFile("path-to-output")
> Uncaught error from thread [spark-akka.actor.default-dispatcher-4]
> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for
> ActorSystem[spark]
> java.lang.OutOfMemoryError: Java heap space
> at java.util.Arrays.copyOf(Arrays.java:2786)
>  at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:94)
>  at
> java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1823)
> at java.io.ObjectOutputStream.write(ObjectOutputStream.java:670)
>  at
> org.apache.spark.scheduler.ResultTask.writeExternal(ResultTask.scala:116)
>  at
> java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1429)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1398)
>  at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
>  at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:27)
>  at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:47)
>  at
> org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:78)
> at
> org.apache.spark.scheduler.cluster.ClusterTaskSetManager.resourceOffer(ClusterTaskSetManager.scala:375)
>  at
> org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3$$anonfun$apply$3$$anonfun$apply$1.apply$mcVI$sp(ClusterScheduler.scala:215)
>  at
> org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3$$anonfun$apply$3.apply(ClusterScheduler.scala:212)
>  at
> org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3$$anonfun$apply$3.apply(ClusterScheduler.scala:209)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at scala.collection.Iterator$$anon$19.foreach(Iterator.scala:399)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at scala.Enumeration$ValueSet.foreach(Enumeration.scala:234)
>  at
> org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3.apply(ClusterScheduler.scala:209)
>  at
> org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3.apply(ClusterScheduler.scala:209)
>  at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>  at
> org.apache.spark.scheduler.cluster.ClusterScheduler.resourceOffers(ClusterScheduler.scala:209)
>  at
> org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor.makeOffers(StandaloneSchedulerBackend.scala:113)
>  at
> org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor$$anonfun$receive$1.apply(StandaloneSchedulerBackend.scala:91)
>  at
> org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor$$anonfun$receive$1.apply(StandaloneSchedulerBackend.scala:64)
>  at akka.actor.Actor$class.apply(Actor.scala:318)
> at
> org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor.apply(StandaloneSchedulerBackend.scala:47)
>  at akka.actor.ActorCell.invoke(ActorCell.scala:626)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:197)
>
>


-- 
Prashant