You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Brad Miller <bm...@eecs.berkeley.edu> on 2014/06/04 17:23:12 UTC

pyspark join crash

Hi All,

I have experienced some crashing behavior with join in pyspark.  When I
attempt a join with 2000 partitions in the result, the join succeeds, but
when I use only 200 partitions in the result, the join fails with the
message "Job aborted due to stage failure: Master removed our application:
FAILED".

The crash always occurs at the beginning of the shuffle phase.  Based on my
observations, it seems like the workers in the read phase may be fetching
entire blocks from the write phase of the shuffle rather than just the
records necessary to compose the partition the reader is responsible for.
 Hence, when there are fewer partitions in the read phase, the worker is
likely to need a record from each of the write partitions and consequently
attempts to load the entire data set into the memory of a single machine
(which then causes the out of memory crash I observe in /var/log/syslog).

Can anybody confirm if this is the behavior of pyspark?  I am glad to
supply additional details about my observed behavior upon request.

best,
-Brad

Re: pyspark join crash

Posted by Matei Zaharia <ma...@gmail.com>.
I think the problem is that once unpacked in Python, the objects take considerably more space, as they are stored as Python objects in a Python dictionary. Take a look at python/pyspark/join.py and combineByKey in python/pyspark/rdd.py. We should probably try to store these in serialized form.

I’m not sure whether there’s a great way to inspect a Python process’s memory, but looking at what consumes memory in a reducer process would be useful.

Matei 


On Jun 4, 2014, at 2:34 PM, Brad Miller <bm...@eecs.berkeley.edu> wrote:

> Hi Matei,
> 
> Thanks for the reply and creating the JIRA. I hear what you're saying, although to be clear I want to still state that it seems like each reduce task is loading significantly more data than just the records needed for that task.  The workers seem to load all data from each block containing a record needed by the reduce task.
> 
> I base this hypothesis on the following:
> -My dataset is about 100G uncompressed, 22G serialized in memory with compression enabled
> -There are 130K records
> -The initial RDD contains 1677 partitions, averaging 60M (uncompressed)
> -There are 3 cores per node (each running one reduce task at a time)
> -Each node has 32G of memory
> 
> Note that I am attempting to join the dataset to itself and I ran this experiment after caching the dataset in memory with serialization and compression enabled.
> 
> Given these figures, even with only 200 partitions the average output partition size (uncompressed) would be 1G (as the dataset is being joined to itself, resulting in 200G over 200 partitions), requiring 3G from each machine on average.  The behavior I observe is that the kernel kills jobs in many of the nodes at nearly the exact same time right after the read phase starts; it seems likely this would occur in each node except the master begins detecting failures and stops the job (and I observe memory spiking on all machines).  Indeed, I observe a large memory spike at each node.
> 
> When I attempt the join with 2000 output partitions, it succeeds.  Note that there are about 65 records per output partition on average, which means the reader only needs to load input from about 130 blocks (as the dataset is joined to itself).  Given that the average uncompressed block size is 60M, even if the entire block were loaded (not just the relevant record) we would expect about 23G of memory to be used per node on average.
> 
> I began suspecting the behavior of loading entire blocks based on the logging from the workers (i.e. "BlockFetcherIterator$BasicBlockFetcherIterator: Getting 122 non-empty blocks out of 3354 blocks").  If it is definitely not the case that entire blocks are loaded from the writers, then it would seem like there is some significant overhead which is chewing threw lots of memory (perhaps similar to the problem with python broadcast variables chewing through memory https://spark-project.atlassian.net/browse/SPARK-1065).
> 
> -Brad
> 
> 
> 
> On Wed, Jun 4, 2014 at 1:42 PM, Matei Zaharia <ma...@gmail.com> wrote:
> In PySpark, the data processed by each reduce task needs to fit in memory within the Python process, so you should use more tasks to process this dataset. Data is spilled to disk across tasks.
> 
> I’ve created https://issues.apache.org/jira/browse/SPARK-2021 to track this — it’s something we’ve been meaning to look at soon.
> 
> Matei
> 
> On Jun 4, 2014, at 8:23 AM, Brad Miller <bm...@eecs.berkeley.edu> wrote:
> 
> > Hi All,
> >
> > I have experienced some crashing behavior with join in pyspark.  When I attempt a join with 2000 partitions in the result, the join succeeds, but when I use only 200 partitions in the result, the join fails with the message "Job aborted due to stage failure: Master removed our application: FAILED".
> >
> > The crash always occurs at the beginning of the shuffle phase.  Based on my observations, it seems like the workers in the read phase may be fetching entire blocks from the write phase of the shuffle rather than just the records necessary to compose the partition the reader is responsible for.  Hence, when there are fewer partitions in the read phase, the worker is likely to need a record from each of the write partitions and consequently attempts to load the entire data set into the memory of a single machine (which then causes the out of memory crash I observe in /var/log/syslog).
> >
> > Can anybody confirm if this is the behavior of pyspark?  I am glad to supply additional details about my observed behavior upon request.
> >
> > best,
> > -Brad
> 
> 


Re: pyspark join crash

Posted by Brad Miller <bm...@eecs.berkeley.edu>.
Hi Matei,

Thanks for the reply and creating the JIRA. I hear what you're saying,
although to be clear I want to still state that it seems like each reduce
task is loading significantly more data than just the records needed for
that task.  The workers seem to load all data from each block containing a
record needed by the reduce task.

I base this hypothesis on the following:
-My dataset is about 100G uncompressed, 22G serialized in memory with
compression enabled
-There are 130K records
-The initial RDD contains 1677 partitions, averaging 60M (uncompressed)
-There are 3 cores per node (each running one reduce task at a time)
-Each node has 32G of memory

Note that I am attempting to join the dataset to itself and I ran this
experiment after caching the dataset in memory with serialization and
compression enabled.

Given these figures, even with only 200 partitions the average output
partition size (uncompressed) would be 1G (as the dataset is being joined
to itself, resulting in 200G over 200 partitions), requiring 3G from each
machine on average.  The behavior I observe is that the kernel kills jobs
in many of the nodes at nearly the exact same time right after the read
phase starts; it seems likely this would occur in each node except the
master begins detecting failures and stops the job (and I observe memory
spiking on all machines).  Indeed, I observe a large memory spike at each
node.

When I attempt the join with 2000 output partitions, it succeeds.  Note
that there are about 65 records per output partition on average, which
means the reader only needs to load input from about 130 blocks (as the
dataset is joined to itself).  Given that the average uncompressed block
size is 60M, even if the entire block were loaded (not just the relevant
record) we would expect about 23G of memory to be used per node on average.

I began suspecting the behavior of loading entire blocks based on the
logging from the workers (i.e.
"BlockFetcherIterator$BasicBlockFetcherIterator: Getting 122 non-empty
blocks out of 3354 blocks").  If it is definitely not the case that entire
blocks are loaded from the writers, then it would seem like there is some
significant overhead which is chewing threw lots of memory (perhaps similar
to the problem with python broadcast variables chewing through memory
https://spark-project.atlassian.net/browse/SPARK-1065).

-Brad



On Wed, Jun 4, 2014 at 1:42 PM, Matei Zaharia <ma...@gmail.com>
wrote:

> In PySpark, the data processed by each reduce task needs to fit in memory
> within the Python process, so you should use more tasks to process this
> dataset. Data is spilled to disk across tasks.
>
> I’ve created https://issues.apache.org/jira/browse/SPARK-2021 to track
> this — it’s something we’ve been meaning to look at soon.
>
> Matei
>
> On Jun 4, 2014, at 8:23 AM, Brad Miller <bm...@eecs.berkeley.edu>
> wrote:
>
> > Hi All,
> >
> > I have experienced some crashing behavior with join in pyspark.  When I
> attempt a join with 2000 partitions in the result, the join succeeds, but
> when I use only 200 partitions in the result, the join fails with the
> message "Job aborted due to stage failure: Master removed our application:
> FAILED".
> >
> > The crash always occurs at the beginning of the shuffle phase.  Based on
> my observations, it seems like the workers in the read phase may be
> fetching entire blocks from the write phase of the shuffle rather than just
> the records necessary to compose the partition the reader is responsible
> for.  Hence, when there are fewer partitions in the read phase, the worker
> is likely to need a record from each of the write partitions and
> consequently attempts to load the entire data set into the memory of a
> single machine (which then causes the out of memory crash I observe in
> /var/log/syslog).
> >
> > Can anybody confirm if this is the behavior of pyspark?  I am glad to
> supply additional details about my observed behavior upon request.
> >
> > best,
> > -Brad
>
>

Re: pyspark join crash

Posted by Matei Zaharia <ma...@gmail.com>.
In PySpark, the data processed by each reduce task needs to fit in memory within the Python process, so you should use more tasks to process this dataset. Data is spilled to disk across tasks.

I’ve created https://issues.apache.org/jira/browse/SPARK-2021 to track this — it’s something we’ve been meaning to look at soon.

Matei

On Jun 4, 2014, at 8:23 AM, Brad Miller <bm...@eecs.berkeley.edu> wrote:

> Hi All,
> 
> I have experienced some crashing behavior with join in pyspark.  When I attempt a join with 2000 partitions in the result, the join succeeds, but when I use only 200 partitions in the result, the join fails with the message "Job aborted due to stage failure: Master removed our application: FAILED".
> 
> The crash always occurs at the beginning of the shuffle phase.  Based on my observations, it seems like the workers in the read phase may be fetching entire blocks from the write phase of the shuffle rather than just the records necessary to compose the partition the reader is responsible for.  Hence, when there are fewer partitions in the read phase, the worker is likely to need a record from each of the write partitions and consequently attempts to load the entire data set into the memory of a single machine (which then causes the out of memory crash I observe in /var/log/syslog).
> 
> Can anybody confirm if this is the behavior of pyspark?  I am glad to supply additional details about my observed behavior upon request.
> 
> best,
> -Brad