You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Brad Miller <bm...@eecs.berkeley.edu> on 2014/04/08 04:37:10 UTC

trouble with "join" on large RDDs

I am running the latest version of PySpark branch-0.9 and having some
trouble with join.

One RDD is about 100G (25GB compressed and serialized in memory) with
130K records, the other RDD is about 10G (2.5G compressed and
serialized in memory) with 330K records.  I load each RDD from HDFS,
invoke keyBy to key each record, and then attempt to join the RDDs.

The join consistently crashes at the beginning of the reduce phase.
Note that when joining the 10G RDD to itself there is no problem.

Prior to the crash, several suspicious things happen:

-All map output files from the map phase of the join are written to
spark.local.dir, even though there should be plenty of memory left to
contain the map output.  I am reasonably sure *all* map outputs are
written to disk because the size of the map output spill directory
matches the size of the shuffle write (as displayed in the user
interface) for each machine.

-In the beginning of the reduce phase of the join, memory consumption
on each work spikes and each machine runs out of memory (as evidenced
by a "Cannon allocate memory" exception in Java).  This is
particularly surprising since each machine has 30G of ram and each
spark worker has only been allowed 10G.

-In the web UI both the "Shuffle Spill (Memory)" and "Shuffle Spill
(Disk)" fields for each machine remain at 0.0 despite shuffle files
being written into spark.local.dir.

Since some of the shuffle files were larger than 10M, I tried setting
spark.akka,frameSize to 1024.  There was no change in the crash
behavior.

Does anybody have any insight into what may be the problem here?  I
have attached output from both the successful join of the 10G RDD to
itself (join_good) and the failed join of the 10G RDD to the 100G RDD
(join_bad).

Re: trouble with "join" on large RDDs

Posted by Harry Brundage <ha...@harry.me>.
Brad: did you ever manage to figure this out? We're experiencing similar
problems, and have also found that the memory limitations supplied to the
Java side of PySpark don't limit how much memory Python can consume (which
makes sense). 

Have you profiled the datasets you are trying to join? Is there any "skew"
to the data where a handful of join key values occur far more often than the
rest of the values? Note that the join key in PySpark is computed by default
using the python `hash` function which for non-builtin values can have
unexpected behaviour. 



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/trouble-with-join-on-large-RDDs-tp3864p4243.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: trouble with "join" on large RDDs

Posted by Brad Miller <bm...@eecs.berkeley.edu>.
I set SPARK_MEM in the driver process by setting
"spark.executor.memory" to 10G.  Each machine had 32G of RAM and a
dedicated 32G spill volume.  I believe all of the units are in pages,
and the page size is the standard 4K.  There are 15 slave nodes in the
cluster and the sizes of the datasets I'm trying to join are about
2.5G and 25G when serialized and compressed in the RDD cache.

I appreciate that Python lacks the type of heap size controls
available in Java, but lack any concept of how the different
computational tasks are partitioned between Java and Python in pyspark
(so it's unclear to me how much freedom python should have to chew
through tons of memory).

A couple questions which this raises for me are:
-Are there any parameters I could tune differently to try and prevent
this crashing behavior?
-Do we know why this doesn't spill to disk (as Patrick Wendell
mentions that shuffle spill is for aggregations which occur during the
reduce phase)?
-Do we have any hunch about what computation is occurring when the crash occurs?

I'd definitely appreciate the insight of others, and am willing to run
experiments and send results/errors/logs out.  Also, I'm physically
located in Soda Hall (Berkeley) so if anyone near by is interested to
examine this first hand I am glad to meet up.

best,
-Brad


On Wed, Apr 9, 2014 at 4:21 AM, Andrew Ash <an...@andrewash.com> wrote:
> A JVM can easily be limited in how much memory it uses with the -Xmx
> parameter, but Python doesn't have memory limits built in in such a
> first-class way.  Maybe the memory limits aren't making it to the python
> executors.
>
> What was your SPARK_MEM setting?  The JVM below seems to be using 603201
> (pages?) and the 3 large python processes each are using ~1800000 (pages?).
> I'm unsure the units that the OOM killer's RSS column is in.  Could be
> either pages (4kb each) or bytes.
>
>
> Apr  8 11:19:19 bennett kernel: [86368.978326] [ 2348]  1002  2348    12573
> 2102      22        0             0 python
> Apr  8 11:19:19 bennett kernel: [86368.978329] [ 2349]  1002  2349    12573
> 2101      22        0             0 python
> Apr  8 11:19:19 bennett kernel: [86368.978332] [ 2350]  1002  2350    12573
> 2101      22        0             0 python
> Apr  8 11:19:19 bennett kernel: [86368.978336] [ 5115]  1002  5115    12571
> 2101      22        0             0 python
> Apr  8 11:19:19 bennett kernel: [86368.978339] [ 5116]  1002  5116    12571
> 2101      22        0             0 python
> Apr  8 11:19:19 bennett kernel: [86368.978341] [ 5117]  1002  5117    12571
> 2101      22        0             0 python
> Apr  8 11:19:19 bennett kernel: [86368.978344] [ 7725]  1002  7725    12570
> 2098      22        0             0 python
> Apr  8 11:19:19 bennett kernel: [86368.978347] [ 7726]  1002  7726    12570
> 2098      22        0             0 python
> Apr  8 11:19:19 bennett kernel: [86368.978350] [ 7727]  1002  7727    12570
> 2098      22        0             0 python
> Apr  8 11:19:19 bennett kernel: [86368.978353] [10324]  1002 10324    12570
> 2098      23        0             0 python
> Apr  8 11:19:19 bennett kernel: [86368.978356] [10325]  1002 10325    12570
> 2098      23        0             0 python
> Apr  8 11:19:19 bennett kernel: [86368.978359] [10326]  1002 10326    12570
> 2098      23        0             0 python
> Apr  8 11:19:19 bennett kernel: [86368.978362] [12668]  1002 12668   603201
> 47932     190        0             0 java
> Apr  8 11:19:19 bennett kernel: [86368.978366] [13295]  1002 13295    12570
> 2100      23        0             0 python
> Apr  8 11:19:19 bennett kernel: [86368.978368] [13296]  1002 13296    12570
> 2100      23        0             0 python
> Apr  8 11:19:19 bennett kernel: [86368.978371] [13297]  1002 13297    12570
> 2100      23        0             0 python
> Apr  8 11:19:19 bennett kernel: [86368.978375] [15192]  1002 15192    12570
> 2098      23        0             0 python
> Apr  8 11:19:19 bennett kernel: [86368.978377] [15193]  1002 15193    12570
> 2098      23        0             0 python
> Apr  8 11:19:19 bennett kernel: [86368.978379] [15195]  1002 15195    12570
> 2098      23        0             0 python
> Apr  8 11:19:19 bennett kernel: [86368.978381] [15198]  1002 15198  1845471
> 1818463    3573        0             0 python
> Apr  8 11:19:19 bennett kernel: [86368.978383] [15200]  1002 15200  1710479
> 1686492    3316        0             0 python
> Apr  8 11:19:19 bennett kernel: [86368.978384] [15201]  1002 15201  1788470
> 1762344    3463        0             0 python
> Apr  8 11:19:19 bennett kernel: [86368.978386] Out of memory: Kill process
> 15198 (python) score 221 or sacrifice child
> Apr  8 11:19:19 bennett kernel: [86368.978389] Killed process 15198 (python)
> total-vm:7381884kB, anon-rss:7273852kB, file-rss:0kB
>
>
> On Tue, Apr 8, 2014 at 2:56 PM, Brad Miller <bm...@eecs.berkeley.edu>
> wrote:
>>
>> Hi All,
>>
>> I poked around a bit more to (1) confirm my suspicions that the crash
>> was related to memory consumption and (2) figure out why there is no
>> error shown in 12_stderr, the spark executor log file from the
>> executors on bennett.research.intel.research.net.
>>
>> The syslog file (from /var/log/syslog on bennett, attached) shows that
>> the machine ran out of memory, the memory was mostly consumed by 1
>> java process and 3 python processes (I am running pyspark with 3 cores
>> per machine), and then the kernel began killing java and python
>> processes to ease memory pressure.  It seems likely that these
>> processes were the spark processes, and there's no errors recorded in
>> 12_stderr because the process was killed by the OS (rather than
>> experiencing an unhandled "cannot allocate memory" exception).
>>
>> I'm a little confused how Spark could consume so much memory during
>> the reduce phase of the shuffle.  Shouldn't Spark remain within the
>> SPARK_MEM limitations on memory consumption, and spill to disk in the
>> event that there isn't enough memory?
>>
>> -Brad
>>
>>
>> On Tue, Apr 8, 2014 at 12:50 PM, Brad Miller <bm...@eecs.berkeley.edu>
>> wrote:
>> > Hi Patrick,
>> >
>> >> The shuffle data is written through the buffer cache of the operating
>> >> system, so you would expect the files to show up there immediately and
>> >> probably to show up as being their full size when you do "ls". In
>> >> reality
>> >> though these are likely residing in the OS cache and not on disk.
>> >
>> > I see.  Perhaps the memory consumption is related to this?
>> >
>> >> Could you paste the error here?
>> >
>> > While I have definitely seen "cannot allocate memory" errors while
>> > trying to do this, I am unable to reproduce one now.  Instead, I am
>> > able to produce "most recent failure: unknown" (see full error
>> > displayed in my iPython session below).  Initially, I assumed there
>> > was some sort of non-determinism which caused the error to
>> > occasionally be "unknown", but now I realize that it may have been a
>> > consistent change which occurred when I updated to the latest
>> > brach-0.9 (previously I was running a version I pulled around March
>> > 10th).
>> >
>> > Py4JJavaError: An error occurred while calling o274.collect.
>> > : org.apache.spark.SparkException: Job aborted: Task 2.0:29 failed 4
>> > times (most recent failure: unknown)
>> >     at
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
>> >     at
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
>> >     at
>> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> >     at
>> > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> >     at
>> > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
>> >     at
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
>> >     at
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
>> >     at scala.Option.foreach(Option.scala:236)
>> >     at
>> > org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
>> >     at
>> > org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
>> >     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> >     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> >     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> >     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> >     at
>> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> >     at
>> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> >     at
>> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> >     at
>> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> >     at
>> > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >
>> >> From the logs it looks like your executor has died. Would you be able
>> >> to
>> >> paste the log from the executor with the exact failure? It would show
>> >> up in
>> >> the /work directory inside of spark's directory on the cluster.
>> >
>> > I've attached the logging output from the driver from when I re-ran
>> > the join operation this morning.  It seems that specific, individual
>> > workers (or perhaps executors is the right term?) begin to die and
>> > then are re-launched by the master/driver.  When I examine the
>> > app-xxx...xxx folder corresponding to this job on the first worker to
>> > fail (bennett.research.intel-research.net), there are several numbered
>> > folders inside (12, 15, 21, 22, 23) which seem to correspond to each
>> > invocation of the executor as recorded in the driver log.  stdout is
>> > consistently empty, and stderr is not.  I have attached all of these
>> > logs as <executor_id>_stderr.
>> >
>> > Surprisingly, 12_stderr does not record any sort of error, although
>> > 15_stderr, 21_stderr, and 22_stderr do.  These errors are all of the
>> > form:
>> >
>> > 14/04/08 11:19:42 ERROR Executor: Uncaught exception in thread
>> > Thread[stdin writer for python,5,main]
>> > org.apache.spark.FetchFailedException: Fetch failed: null 0 -1 44
>> >     at
>> > org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:316)
>> >     at
>> > org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:314)
>> >     at
>> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> >     at
>> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> >     at
>> > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> >     at
>> > scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> >     at
>> > scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> >     at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>> >     at
>> > org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:313)
>> >     at
>> > org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:139)
>> >     at
>> > org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43)
>> >     at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:61)
>> >     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>> >     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>> >     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>> >     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>> >     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>> >     at
>> > org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>> > Caused by: java.lang.Exception: Missing an output location for shuffle 0
>> >     at
>> > org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:317)
>> >     ... 17 more
>> >
>> > Note that in "Fetch failed: null 0 -1 44" the last numeral varies in
>> > each of the errors.  I have also attached an executor stderr trace
>> > from malkin.research.intel-research.net, which one of several
>> > executors to crash after bennett.  Notice that this trace does contain
>> > errors, although they seem to be saying that there's trouble
>> > connecting to bennett.
>> >
>> > If anyone would like additional information, please let me know.
>> >
>> > best,
>> > -Brad
>> >
>> > On Mon, Apr 7, 2014 at 9:40 PM, Patrick Wendell <pw...@gmail.com>
>> > wrote:
>> >>
>> >>
>> >>
>> >> On Mon, Apr 7, 2014 at 7:37 PM, Brad Miller
>> >> <bm...@eecs.berkeley.edu>
>> >> wrote:
>> >>>
>> >>> I am running the latest version of PySpark branch-0.9 and having some
>> >>> trouble with join.
>> >>>
>> >>> One RDD is about 100G (25GB compressed and serialized in memory) with
>> >>> 130K records, the other RDD is about 10G (2.5G compressed and
>> >>> serialized in memory) with 330K records.  I load each RDD from HDFS,
>> >>> invoke keyBy to key each record, and then attempt to join the RDDs.
>> >>>
>> >>> The join consistently crashes at the beginning of the reduce phase.
>> >>> Note that when joining the 10G RDD to itself there is no problem.
>> >>>
>> >>> Prior to the crash, several suspicious things happen:
>> >>>
>> >>> -All map output files from the map phase of the join are written to
>> >>> spark.local.dir, even though there should be plenty of memory left to
>> >>> contain the map output.  I am reasonably sure *all* map outputs are
>> >>> written to disk because the size of the map output spill directory
>> >>> matches the size of the shuffle write (as displayed in the user
>> >>> interface) for each machine.
>> >>
>> >>
>> >> The shuffle data is written through the buffer cache of the operating
>> >> system, so you would expect the files to show up there immediately and
>> >> probably to show up as being their full size when you do "ls". In
>> >> reality
>> >> though these are likely residing in the OS cache and not on disk.
>> >>
>> >>>
>> >>> -In the beginning of the reduce phase of the join, memory consumption
>> >>> on each work spikes and each machine runs out of memory (as evidenced
>> >>> by a "Cannon allocate memory" exception in Java).  This is
>> >>> particularly surprising since each machine has 30G of ram and each
>> >>> spark worker has only been allowed 10G.
>> >>
>> >>
>> >> Could you paste the error here?
>> >>
>> >>>
>> >>> -In the web UI both the "Shuffle Spill (Memory)" and "Shuffle Spill
>> >>> (Disk)" fields for each machine remain at 0.0 despite shuffle files
>> >>> being written into spark.local.dir.
>> >>
>> >>
>> >> Shuffle spill is different than the shuffle files written to
>> >> spark.local.dir. Shuffle spilling is for aggregations that occur on the
>> >> reduce side of the shuffle. A join like this might not see any
>> >> spilling.
>> >>
>> >>
>> >>
>> >> From the logs it looks like your executor has died. Would you be able
>> >> to
>> >> paste the log from the executor with the exact failure? It would show
>> >> up in
>> >> the /work directory inside of spark's directory on the cluster.
>
>

Re: trouble with "join" on large RDDs

Posted by Andrew Ash <an...@andrewash.com>.
A JVM can easily be limited in how much memory it uses with the -Xmx
parameter, but Python doesn't have memory limits built in in such a
first-class way.  Maybe the memory limits aren't making it to the python
executors.

What was your SPARK_MEM setting?  The JVM below seems to be using 603201
(pages?) and the 3 large python processes each are using ~1800000 (pages?).
 I'm unsure the units that the OOM killer's RSS column is in.  Could be
either pages (4kb each) or bytes.


Apr  8 11:19:19 bennett kernel: [86368.978326] [ 2348]  1002  2348    12573
    2102      22        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978329] [ 2349]  1002  2349    12573
    2101      22        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978332] [ 2350]  1002  2350    12573
    2101      22        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978336] [ 5115]  1002  5115    12571
    2101      22        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978339] [ 5116]  1002  5116    12571
    2101      22        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978341] [ 5117]  1002  5117    12571
    2101      22        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978344] [ 7725]  1002  7725    12570
    2098      22        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978347] [ 7726]  1002  7726    12570
    2098      22        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978350] [ 7727]  1002  7727    12570
    2098      22        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978353] [10324]  1002 10324    12570
    2098      23        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978356] [10325]  1002 10325    12570
    2098      23        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978359] [10326]  1002 10326    12570
    2098      23        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978362] [12668]  1002 12668   603201
   47932     190        0             0 java
Apr  8 11:19:19 bennett kernel: [86368.978366] [13295]  1002 13295    12570
    2100      23        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978368] [13296]  1002 13296    12570
    2100      23        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978371] [13297]  1002 13297    12570
    2100      23        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978375] [15192]  1002 15192    12570
    2098      23        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978377] [15193]  1002 15193    12570
    2098      23        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978379] [15195]  1002 15195    12570
    2098      23        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978381] [15198]  1002 15198  1845471
 1818463    3573        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978383] [15200]  1002 15200  1710479
 1686492    3316        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978384] [15201]  1002 15201  1788470
 1762344    3463        0             0 python
Apr  8 11:19:19 bennett kernel: [86368.978386] Out of memory: Kill process
15198 (python) score 221 or sacrifice child
Apr  8 11:19:19 bennett kernel: [86368.978389] Killed process 15198
(python) total-vm:7381884kB, anon-rss:7273852kB, file-rss:0kB


On Tue, Apr 8, 2014 at 2:56 PM, Brad Miller <bm...@eecs.berkeley.edu>wrote:

> Hi All,
>
> I poked around a bit more to (1) confirm my suspicions that the crash
> was related to memory consumption and (2) figure out why there is no
> error shown in 12_stderr, the spark executor log file from the
> executors on bennett.research.intel.research.net.
>
> The syslog file (from /var/log/syslog on bennett, attached) shows that
> the machine ran out of memory, the memory was mostly consumed by 1
> java process and 3 python processes (I am running pyspark with 3 cores
> per machine), and then the kernel began killing java and python
> processes to ease memory pressure.  It seems likely that these
> processes were the spark processes, and there's no errors recorded in
> 12_stderr because the process was killed by the OS (rather than
> experiencing an unhandled "cannot allocate memory" exception).
>
> I'm a little confused how Spark could consume so much memory during
> the reduce phase of the shuffle.  Shouldn't Spark remain within the
> SPARK_MEM limitations on memory consumption, and spill to disk in the
> event that there isn't enough memory?
>
> -Brad
>
>
> On Tue, Apr 8, 2014 at 12:50 PM, Brad Miller <bm...@eecs.berkeley.edu>
> wrote:
> > Hi Patrick,
> >
> >> The shuffle data is written through the buffer cache of the operating
> >> system, so you would expect the files to show up there immediately and
> >> probably to show up as being their full size when you do "ls". In
> reality
> >> though these are likely residing in the OS cache and not on disk.
> >
> > I see.  Perhaps the memory consumption is related to this?
> >
> >> Could you paste the error here?
> >
> > While I have definitely seen "cannot allocate memory" errors while
> > trying to do this, I am unable to reproduce one now.  Instead, I am
> > able to produce "most recent failure: unknown" (see full error
> > displayed in my iPython session below).  Initially, I assumed there
> > was some sort of non-determinism which caused the error to
> > occasionally be "unknown", but now I realize that it may have been a
> > consistent change which occurred when I updated to the latest
> > brach-0.9 (previously I was running a version I pulled around March
> > 10th).
> >
> > Py4JJavaError: An error occurred while calling o274.collect.
> > : org.apache.spark.SparkException: Job aborted: Task 2.0:29 failed 4
> > times (most recent failure: unknown)
> >     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
> >     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
> >     at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> >     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> >     at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
> >     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
> >     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
> >     at scala.Option.foreach(Option.scala:236)
> >     at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
> >     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
> >     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> >     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> >     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> >     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> >     at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> >     at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >     at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >     at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >     at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >
> >> From the logs it looks like your executor has died. Would you be able to
> >> paste the log from the executor with the exact failure? It would show
> up in
> >> the /work directory inside of spark's directory on the cluster.
> >
> > I've attached the logging output from the driver from when I re-ran
> > the join operation this morning.  It seems that specific, individual
> > workers (or perhaps executors is the right term?) begin to die and
> > then are re-launched by the master/driver.  When I examine the
> > app-xxx...xxx folder corresponding to this job on the first worker to
> > fail (bennett.research.intel-research.net), there are several numbered
> > folders inside (12, 15, 21, 22, 23) which seem to correspond to each
> > invocation of the executor as recorded in the driver log.  stdout is
> > consistently empty, and stderr is not.  I have attached all of these
> > logs as <executor_id>_stderr.
> >
> > Surprisingly, 12_stderr does not record any sort of error, although
> > 15_stderr, 21_stderr, and 22_stderr do.  These errors are all of the
> > form:
> >
> > 14/04/08 11:19:42 ERROR Executor: Uncaught exception in thread
> > Thread[stdin writer for python,5,main]
> > org.apache.spark.FetchFailedException: Fetch failed: null 0 -1 44
> >     at
> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:316)
> >     at
> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:314)
> >     at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >     at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >     at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> >     at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> >     at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> >     at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
> >     at
> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:313)
> >     at
> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:139)
> >     at
> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43)
> >     at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:61)
> >     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> >     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> >     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> >     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> >     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> >     at
> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
> > Caused by: java.lang.Exception: Missing an output location for shuffle 0
> >     at
> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:317)
> >     ... 17 more
> >
> > Note that in "Fetch failed: null 0 -1 44" the last numeral varies in
> > each of the errors.  I have also attached an executor stderr trace
> > from malkin.research.intel-research.net, which one of several
> > executors to crash after bennett.  Notice that this trace does contain
> > errors, although they seem to be saying that there's trouble
> > connecting to bennett.
> >
> > If anyone would like additional information, please let me know.
> >
> > best,
> > -Brad
> >
> > On Mon, Apr 7, 2014 at 9:40 PM, Patrick Wendell <pw...@gmail.com>
> wrote:
> >>
> >>
> >>
> >> On Mon, Apr 7, 2014 at 7:37 PM, Brad Miller <bmiller1@eecs.berkeley.edu
> >
> >> wrote:
> >>>
> >>> I am running the latest version of PySpark branch-0.9 and having some
> >>> trouble with join.
> >>>
> >>> One RDD is about 100G (25GB compressed and serialized in memory) with
> >>> 130K records, the other RDD is about 10G (2.5G compressed and
> >>> serialized in memory) with 330K records.  I load each RDD from HDFS,
> >>> invoke keyBy to key each record, and then attempt to join the RDDs.
> >>>
> >>> The join consistently crashes at the beginning of the reduce phase.
> >>> Note that when joining the 10G RDD to itself there is no problem.
> >>>
> >>> Prior to the crash, several suspicious things happen:
> >>>
> >>> -All map output files from the map phase of the join are written to
> >>> spark.local.dir, even though there should be plenty of memory left to
> >>> contain the map output.  I am reasonably sure *all* map outputs are
> >>> written to disk because the size of the map output spill directory
> >>> matches the size of the shuffle write (as displayed in the user
> >>> interface) for each machine.
> >>
> >>
> >> The shuffle data is written through the buffer cache of the operating
> >> system, so you would expect the files to show up there immediately and
> >> probably to show up as being their full size when you do "ls". In
> reality
> >> though these are likely residing in the OS cache and not on disk.
> >>
> >>>
> >>> -In the beginning of the reduce phase of the join, memory consumption
> >>> on each work spikes and each machine runs out of memory (as evidenced
> >>> by a "Cannon allocate memory" exception in Java).  This is
> >>> particularly surprising since each machine has 30G of ram and each
> >>> spark worker has only been allowed 10G.
> >>
> >>
> >> Could you paste the error here?
> >>
> >>>
> >>> -In the web UI both the "Shuffle Spill (Memory)" and "Shuffle Spill
> >>> (Disk)" fields for each machine remain at 0.0 despite shuffle files
> >>> being written into spark.local.dir.
> >>
> >>
> >> Shuffle spill is different than the shuffle files written to
> >> spark.local.dir. Shuffle spilling is for aggregations that occur on the
> >> reduce side of the shuffle. A join like this might not see any spilling.
> >>
> >>
> >>
> >> From the logs it looks like your executor has died. Would you be able to
> >> paste the log from the executor with the exact failure? It would show
> up in
> >> the /work directory inside of spark's directory on the cluster.
>

Re: trouble with "join" on large RDDs

Posted by Brad Miller <bm...@eecs.berkeley.edu>.
Hi All,

I poked around a bit more to (1) confirm my suspicions that the crash
was related to memory consumption and (2) figure out why there is no
error shown in 12_stderr, the spark executor log file from the
executors on bennett.research.intel.research.net.

The syslog file (from /var/log/syslog on bennett, attached) shows that
the machine ran out of memory, the memory was mostly consumed by 1
java process and 3 python processes (I am running pyspark with 3 cores
per machine), and then the kernel began killing java and python
processes to ease memory pressure.  It seems likely that these
processes were the spark processes, and there's no errors recorded in
12_stderr because the process was killed by the OS (rather than
experiencing an unhandled "cannot allocate memory" exception).

I'm a little confused how Spark could consume so much memory during
the reduce phase of the shuffle.  Shouldn't Spark remain within the
SPARK_MEM limitations on memory consumption, and spill to disk in the
event that there isn't enough memory?

-Brad


On Tue, Apr 8, 2014 at 12:50 PM, Brad Miller <bm...@eecs.berkeley.edu> wrote:
> Hi Patrick,
>
>> The shuffle data is written through the buffer cache of the operating
>> system, so you would expect the files to show up there immediately and
>> probably to show up as being their full size when you do "ls". In reality
>> though these are likely residing in the OS cache and not on disk.
>
> I see.  Perhaps the memory consumption is related to this?
>
>> Could you paste the error here?
>
> While I have definitely seen "cannot allocate memory" errors while
> trying to do this, I am unable to reproduce one now.  Instead, I am
> able to produce "most recent failure: unknown" (see full error
> displayed in my iPython session below).  Initially, I assumed there
> was some sort of non-determinism which caused the error to
> occasionally be "unknown", but now I realize that it may have been a
> consistent change which occurred when I updated to the latest
> brach-0.9 (previously I was running a version I pulled around March
> 10th).
>
> Py4JJavaError: An error occurred while calling o274.collect.
> : org.apache.spark.SparkException: Job aborted: Task 2.0:29 failed 4
> times (most recent failure: unknown)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
>     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
>     at scala.Option.foreach(Option.scala:236)
>     at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>     at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>> From the logs it looks like your executor has died. Would you be able to
>> paste the log from the executor with the exact failure? It would show up in
>> the /work directory inside of spark's directory on the cluster.
>
> I've attached the logging output from the driver from when I re-ran
> the join operation this morning.  It seems that specific, individual
> workers (or perhaps executors is the right term?) begin to die and
> then are re-launched by the master/driver.  When I examine the
> app-xxx...xxx folder corresponding to this job on the first worker to
> fail (bennett.research.intel-research.net), there are several numbered
> folders inside (12, 15, 21, 22, 23) which seem to correspond to each
> invocation of the executor as recorded in the driver log.  stdout is
> consistently empty, and stderr is not.  I have attached all of these
> logs as <executor_id>_stderr.
>
> Surprisingly, 12_stderr does not record any sort of error, although
> 15_stderr, 21_stderr, and 22_stderr do.  These errors are all of the
> form:
>
> 14/04/08 11:19:42 ERROR Executor: Uncaught exception in thread
> Thread[stdin writer for python,5,main]
> org.apache.spark.FetchFailedException: Fetch failed: null 0 -1 44
>     at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:316)
>     at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:314)
>     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>     at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>     at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>     at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:313)
>     at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:139)
>     at org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43)
>     at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:61)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>     at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
> Caused by: java.lang.Exception: Missing an output location for shuffle 0
>     at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:317)
>     ... 17 more
>
> Note that in "Fetch failed: null 0 -1 44" the last numeral varies in
> each of the errors.  I have also attached an executor stderr trace
> from malkin.research.intel-research.net, which one of several
> executors to crash after bennett.  Notice that this trace does contain
> errors, although they seem to be saying that there's trouble
> connecting to bennett.
>
> If anyone would like additional information, please let me know.
>
> best,
> -Brad
>
> On Mon, Apr 7, 2014 at 9:40 PM, Patrick Wendell <pw...@gmail.com> wrote:
>>
>>
>>
>> On Mon, Apr 7, 2014 at 7:37 PM, Brad Miller <bm...@eecs.berkeley.edu>
>> wrote:
>>>
>>> I am running the latest version of PySpark branch-0.9 and having some
>>> trouble with join.
>>>
>>> One RDD is about 100G (25GB compressed and serialized in memory) with
>>> 130K records, the other RDD is about 10G (2.5G compressed and
>>> serialized in memory) with 330K records.  I load each RDD from HDFS,
>>> invoke keyBy to key each record, and then attempt to join the RDDs.
>>>
>>> The join consistently crashes at the beginning of the reduce phase.
>>> Note that when joining the 10G RDD to itself there is no problem.
>>>
>>> Prior to the crash, several suspicious things happen:
>>>
>>> -All map output files from the map phase of the join are written to
>>> spark.local.dir, even though there should be plenty of memory left to
>>> contain the map output.  I am reasonably sure *all* map outputs are
>>> written to disk because the size of the map output spill directory
>>> matches the size of the shuffle write (as displayed in the user
>>> interface) for each machine.
>>
>>
>> The shuffle data is written through the buffer cache of the operating
>> system, so you would expect the files to show up there immediately and
>> probably to show up as being their full size when you do "ls". In reality
>> though these are likely residing in the OS cache and not on disk.
>>
>>>
>>> -In the beginning of the reduce phase of the join, memory consumption
>>> on each work spikes and each machine runs out of memory (as evidenced
>>> by a "Cannon allocate memory" exception in Java).  This is
>>> particularly surprising since each machine has 30G of ram and each
>>> spark worker has only been allowed 10G.
>>
>>
>> Could you paste the error here?
>>
>>>
>>> -In the web UI both the "Shuffle Spill (Memory)" and "Shuffle Spill
>>> (Disk)" fields for each machine remain at 0.0 despite shuffle files
>>> being written into spark.local.dir.
>>
>>
>> Shuffle spill is different than the shuffle files written to
>> spark.local.dir. Shuffle spilling is for aggregations that occur on the
>> reduce side of the shuffle. A join like this might not see any spilling.
>>
>>
>>
>> From the logs it looks like your executor has died. Would you be able to
>> paste the log from the executor with the exact failure? It would show up in
>> the /work directory inside of spark's directory on the cluster.

Re: trouble with "join" on large RDDs

Posted by Brad Miller <bm...@eecs.berkeley.edu>.
Hi Patrick,

> The shuffle data is written through the buffer cache of the operating
> system, so you would expect the files to show up there immediately and
> probably to show up as being their full size when you do "ls". In reality
> though these are likely residing in the OS cache and not on disk.

I see.  Perhaps the memory consumption is related to this?

> Could you paste the error here?

While I have definitely seen "cannot allocate memory" errors while
trying to do this, I am unable to reproduce one now.  Instead, I am
able to produce "most recent failure: unknown" (see full error
displayed in my iPython session below).  Initially, I assumed there
was some sort of non-determinism which caused the error to
occasionally be "unknown", but now I realize that it may have been a
consistent change which occurred when I updated to the latest
brach-0.9 (previously I was running a version I pulled around March
10th).

Py4JJavaError: An error occurred while calling o274.collect.
: org.apache.spark.SparkException: Job aborted: Task 2.0:29 failed 4
times (most recent failure: unknown)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

> From the logs it looks like your executor has died. Would you be able to
> paste the log from the executor with the exact failure? It would show up in
> the /work directory inside of spark's directory on the cluster.

I've attached the logging output from the driver from when I re-ran
the join operation this morning.  It seems that specific, individual
workers (or perhaps executors is the right term?) begin to die and
then are re-launched by the master/driver.  When I examine the
app-xxx...xxx folder corresponding to this job on the first worker to
fail (bennett.research.intel-research.net), there are several numbered
folders inside (12, 15, 21, 22, 23) which seem to correspond to each
invocation of the executor as recorded in the driver log.  stdout is
consistently empty, and stderr is not.  I have attached all of these
logs as <executor_id>_stderr.

Surprisingly, 12_stderr does not record any sort of error, although
15_stderr, 21_stderr, and 22_stderr do.  These errors are all of the
form:

14/04/08 11:19:42 ERROR Executor: Uncaught exception in thread
Thread[stdin writer for python,5,main]
org.apache.spark.FetchFailedException: Fetch failed: null 0 -1 44
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:316)
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:314)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
    at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:313)
    at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:139)
    at org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:61)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
Caused by: java.lang.Exception: Missing an output location for shuffle 0
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:317)
    ... 17 more

Note that in "Fetch failed: null 0 -1 44" the last numeral varies in
each of the errors.  I have also attached an executor stderr trace
from malkin.research.intel-research.net, which one of several
executors to crash after bennett.  Notice that this trace does contain
errors, although they seem to be saying that there's trouble
connecting to bennett.

If anyone would like additional information, please let me know.

best,
-Brad

On Mon, Apr 7, 2014 at 9:40 PM, Patrick Wendell <pw...@gmail.com> wrote:
>
>
>
> On Mon, Apr 7, 2014 at 7:37 PM, Brad Miller <bm...@eecs.berkeley.edu>
> wrote:
>>
>> I am running the latest version of PySpark branch-0.9 and having some
>> trouble with join.
>>
>> One RDD is about 100G (25GB compressed and serialized in memory) with
>> 130K records, the other RDD is about 10G (2.5G compressed and
>> serialized in memory) with 330K records.  I load each RDD from HDFS,
>> invoke keyBy to key each record, and then attempt to join the RDDs.
>>
>> The join consistently crashes at the beginning of the reduce phase.
>> Note that when joining the 10G RDD to itself there is no problem.
>>
>> Prior to the crash, several suspicious things happen:
>>
>> -All map output files from the map phase of the join are written to
>> spark.local.dir, even though there should be plenty of memory left to
>> contain the map output.  I am reasonably sure *all* map outputs are
>> written to disk because the size of the map output spill directory
>> matches the size of the shuffle write (as displayed in the user
>> interface) for each machine.
>
>
> The shuffle data is written through the buffer cache of the operating
> system, so you would expect the files to show up there immediately and
> probably to show up as being their full size when you do "ls". In reality
> though these are likely residing in the OS cache and not on disk.
>
>>
>> -In the beginning of the reduce phase of the join, memory consumption
>> on each work spikes and each machine runs out of memory (as evidenced
>> by a "Cannon allocate memory" exception in Java).  This is
>> particularly surprising since each machine has 30G of ram and each
>> spark worker has only been allowed 10G.
>
>
> Could you paste the error here?
>
>>
>> -In the web UI both the "Shuffle Spill (Memory)" and "Shuffle Spill
>> (Disk)" fields for each machine remain at 0.0 despite shuffle files
>> being written into spark.local.dir.
>
>
> Shuffle spill is different than the shuffle files written to
> spark.local.dir. Shuffle spilling is for aggregations that occur on the
> reduce side of the shuffle. A join like this might not see any spilling.
>
>
>
> From the logs it looks like your executor has died. Would you be able to
> paste the log from the executor with the exact failure? It would show up in
> the /work directory inside of spark's directory on the cluster.

Re: trouble with "join" on large RDDs

Posted by Patrick Wendell <pw...@gmail.com>.
On Mon, Apr 7, 2014 at 7:37 PM, Brad Miller <bm...@eecs.berkeley.edu>wrote:

> I am running the latest version of PySpark branch-0.9 and having some
> trouble with join.
>
> One RDD is about 100G (25GB compressed and serialized in memory) with
> 130K records, the other RDD is about 10G (2.5G compressed and
> serialized in memory) with 330K records.  I load each RDD from HDFS,
> invoke keyBy to key each record, and then attempt to join the RDDs.
>
> The join consistently crashes at the beginning of the reduce phase.
> Note that when joining the 10G RDD to itself there is no problem.
>
> Prior to the crash, several suspicious things happen:
>
> -All map output files from the map phase of the join are written to
> spark.local.dir, even though there should be plenty of memory left to
> contain the map output.  I am reasonably sure *all* map outputs are
> written to disk because the size of the map output spill directory
> matches the size of the shuffle write (as displayed in the user
> interface) for each machine.
>

The shuffle data is written through the buffer cache of the operating
system, so you would expect the files to show up there immediately and
probably to show up as being their full size when you do "ls". In reality
though these are likely residing in the OS cache and not on disk.


> -In the beginning of the reduce phase of the join, memory consumption
> on each work spikes and each machine runs out of memory (as evidenced
> by a "Cannon allocate memory" exception in Java).  This is
> particularly surprising since each machine has 30G of ram and each
> spark worker has only been allowed 10G.
>

Could you paste the error here?


> -In the web UI both the "Shuffle Spill (Memory)" and "Shuffle Spill
> (Disk)" fields for each machine remain at 0.0 despite shuffle files
> being written into spark.local.dir.
>

Shuffle spill is different than the shuffle files written to
spark.local.dir. Shuffle spilling is for aggregations that occur on the
reduce side of the shuffle. A join like this might not see any spilling.



>From the logs it looks like your executor has died. Would you be able to
paste the log from the executor with the exact failure? It would show up in
the /work directory inside of spark's directory on the cluster.