You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Stephen Haberman <st...@gmail.com> on 2013/10/26 20:43:53 UTC

oome from blockmanager

Hi,

By dropping spark.shuffle.file.buffer.kb to 10k and using Snappy
(thanks, Aaron), the job I'm trying to run is no longer OOMEing because
of 300k LZF buffers taking up 4g of RAM.

But...now it's OOMEing because BlockManager is taking ~3.5gb of RAM
(which is ~90% of the available heap).

Specifically, it's two ConcurrentHashMaps:

* BlockManager.blockInfo has ~1gb retained, AFAICT from ~5.5 million
  entries of (ShuffleBlockId, (BlockInfo, Long))

* BlockManager's DiskBlockManager.blockToFileSegmentMap has ~2.3gb
  retained, AFAICT from about the same ~5.5 million entries of
  (ShuffleBlockId, (FileSegment, Long)).

The job stalls about 3,000 tasks through a 7,000-partition shuffle that
is loading ~500gb from S3 on 5 m1.large (4gb heap) machines. The job
did a few smaller ~50-partition shuffles before this larger one, but
nothing crazy. It's an on-demand/EMR cluster, in standalone mode. 

Both of these maps are TimeStampedHashMaps, which kind of makes me
shudder, but we have the cleaner disabled which AFAIK is what we want,
because we aren't running long-running streaming jobs. And AFAIU if the
hash map did get cleaned up mid-shuffle, lookups would just start
failing (which was actually happening for this job on Spark 0.7 and is
what prompted us to get around to trying Spark 0.8).

So, I haven't really figured out BlockManager yet--any hints on what we
could do here? More machines? Should there really be this many entries
in it for a shuffle of this size?

I know 5 machines/4gb of RAM isn't a lot, and I could use more if
needed, but I just expected the job to go slower, not OOME.

Also, I technically have a heap dump from a m1.xlarge (~15gb of RAM)
cluster that also OOMEd on the same job, but I can't open the file on
my laptop, so I can't tell if it was OOMEing for this issue or another
one (it was not using snappy, but using 10kb file buffers, so I'm
interested to see what happened to it.)

- Stephen


Re: oome from blockmanager

Posted by Josh Rosen <ro...@gmail.com>.
Off the top of my head, there are a few constant-factor optimizations that
we could make to the blockInfo data structure that might reduce its
per-entry memory usage:

- Key the blockInfo HashMap by BlockId.name instead of BlockId.
- Exploit hierarchy/structure when storing info on shuffle blocks, e.g. Map
from ShuffleId -> MapId -> Array indexed by ReduceId -> Info.  This reduces
the number of entries stored in TimestampedHashMap, where each entry has a
bit of create the map entry objects and record the timestamps.  With this
scheme, all block info for a particular shuffle will be evicted at the same
time, reducing the amount of objects that the cleaner thread needs to scan.
- We might be able to eliminate the "pending" and "failed" boolean fields
by storing negative values in the "size" field.



On Sat, Oct 26, 2013 at 11:43 AM, Stephen Haberman <
stephen.haberman@gmail.com> wrote:

> Hi,
>
> By dropping spark.shuffle.file.buffer.kb to 10k and using Snappy
> (thanks, Aaron), the job I'm trying to run is no longer OOMEing because
> of 300k LZF buffers taking up 4g of RAM.
>
> But...now it's OOMEing because BlockManager is taking ~3.5gb of RAM
> (which is ~90% of the available heap).
>
> Specifically, it's two ConcurrentHashMaps:
>
> * BlockManager.blockInfo has ~1gb retained, AFAICT from ~5.5 million
>   entries of (ShuffleBlockId, (BlockInfo, Long))
>
> * BlockManager's DiskBlockManager.blockToFileSegmentMap has ~2.3gb
>   retained, AFAICT from about the same ~5.5 million entries of
>   (ShuffleBlockId, (FileSegment, Long)).
>
> The job stalls about 3,000 tasks through a 7,000-partition shuffle that
> is loading ~500gb from S3 on 5 m1.large (4gb heap) machines. The job
> did a few smaller ~50-partition shuffles before this larger one, but
> nothing crazy. It's an on-demand/EMR cluster, in standalone mode.
>
> Both of these maps are TimeStampedHashMaps, which kind of makes me
> shudder, but we have the cleaner disabled which AFAIK is what we want,
> because we aren't running long-running streaming jobs. And AFAIU if the
> hash map did get cleaned up mid-shuffle, lookups would just start
> failing (which was actually happening for this job on Spark 0.7 and is
> what prompted us to get around to trying Spark 0.8).
>
> So, I haven't really figured out BlockManager yet--any hints on what we
> could do here? More machines? Should there really be this many entries
> in it for a shuffle of this size?
>
> I know 5 machines/4gb of RAM isn't a lot, and I could use more if
> needed, but I just expected the job to go slower, not OOME.
>
> Also, I technically have a heap dump from a m1.xlarge (~15gb of RAM)
> cluster that also OOMEd on the same job, but I can't open the file on
> my laptop, so I can't tell if it was OOMEing for this issue or another
> one (it was not using snappy, but using 10kb file buffers, so I'm
> interested to see what happened to it.)
>
> - Stephen
>
>

Re: oome from blockmanager

Posted by Stephen Haberman <st...@gmail.com>.
Hi guys,

> When you do a shuffle form N map partitions to M reduce partitions, there are
> N * M output blocks created and each one is tracked. That's why all these
> per-block overheads are causing you to OOM.

So, I'm peering into a heap dump from a 18,000-partition shuffle and thought
I would just mention what I'm seeing.

The problem AFAICT is that when performing a single ShuffleMapTask (or two,
since we have two executors), ShuffleBlockManager opens & buffers 18,000 files.
(So I believe this is slightly different than the 18,000*18,000 = total blocks
in BlockManager issue before.)

Each DiskBlockObjectWriter takes ~108k--80k of which is from the
SnappyOutputStream (two 30k byte array buffers) and 25k in the objOut
KryoSerializationStream.

At 18,000 DiskBlockObjectWriters * 108k * 2 executors, that = 3.8gb. We
have a 5.5gb Xmx setting (using m1.larges and reserving 2gb for the OS,
worker, etc.), but I can't quite tell where the other ~2gb went.

So, this is a naive question, but what if I just turned off
compression? I don't really know how much disk space it's saving, but
these buffers take up a non-trivial amount of space.

Tangentially, there are 2.8 million kryo.Registration objects, which is
13% of the total objects; I understand that each file gets its own Kryo
stream, which probably gets its own set of config objects. It makes
sense in the small, but 18,000 (36,000 if counting both executors) Kryo
streams leads to a lot of these. It's not the core issue, but just
mentioning it.

So, I've already got the job rerunning with less partitions, but just
thought I'd send this as an FYI to see if it sparked any potential
optimizations.

Thanks!

- Stephen



Re: oome from blockmanager

Posted by Aaron Davidson <il...@gmail.com>.
It is available in branch-0.8 and master.


On Tue, Nov 5, 2013 at 10:23 AM, Ashish Rangole <ar...@gmail.com> wrote:

> Awesome indeed! Is this available in the main branch now?
>
> Thanks!
> On Nov 5, 2013 9:50 AM, "Stephen Haberman" <st...@gmail.com>
> wrote:
>
>>
>> > As a followup on this, the memory footprint of all shuffle metadata
>> > has been  greatly reduced. For your original workload with 7k
>> > mappers, 7k reducers, and 5 machines, the total metadata size should
>> > have decreased from ~3.3 GB to ~80 MB.
>>
>> Wow! Awesome work, Aaron! Thanks for the amazing quick follow up.
>>
>> - Stephen
>>
>>

Re: oome from blockmanager

Posted by Ashish Rangole <ar...@gmail.com>.
Awesome indeed! Is this available in the main branch now?

Thanks!
On Nov 5, 2013 9:50 AM, "Stephen Haberman" <st...@gmail.com>
wrote:

>
> > As a followup on this, the memory footprint of all shuffle metadata
> > has been  greatly reduced. For your original workload with 7k
> > mappers, 7k reducers, and 5 machines, the total metadata size should
> > have decreased from ~3.3 GB to ~80 MB.
>
> Wow! Awesome work, Aaron! Thanks for the amazing quick follow up.
>
> - Stephen
>
>

Re: oome from blockmanager

Posted by Stephen Haberman <st...@gmail.com>.
> As a followup on this, the memory footprint of all shuffle metadata
> has been  greatly reduced. For your original workload with 7k
> mappers, 7k reducers, and 5 machines, the total metadata size should
> have decreased from ~3.3 GB to ~80 MB.

Wow! Awesome work, Aaron! Thanks for the amazing quick follow up.

- Stephen


Re: oome from blockmanager

Posted by Aaron Davidson <il...@gmail.com>.
As a followup on this, the memory footprint of all shuffle metadata has
been  greatly reduced. For your original workload with 7k mappers, 7k
reducers, and 5 machines, the total metadata size should have decreased
from ~3.3 GB to ~80 MB.


On Tue, Oct 29, 2013 at 9:07 AM, Aaron Davidson <il...@gmail.com> wrote:

> Great! Glad to hear it worked out. Spark definitely has a pain point about
> deciding the right number of partitions, and I think we're going to be
> spending a lot of time trying to reduce that issue.
>
> Currently working on the patch to reduce the shuffle file block overheads,
> but in the meantime, you can set "spark.shuffle.consolidateFiles=false" to
> exchange OOMEs due to too many partitions for worse performance (probably
> an acceptable tradeoff).
>
>
>
> On Mon, Oct 28, 2013 at 2:31 PM, Stephen Haberman <
> stephen.haberman@gmail.com> wrote:
>
>> Hey guys,
>>
>> As a follow up, I raised our target partition size to 600mb (up from
>> 64mb), which split this report's 500gb of tiny S3 files into ~700
>> partitions, and everything ran much smoother.
>>
>> In retrospect, this was the same issue we'd ran into before, having too
>> many partitions, and had previously solved by throwing some guesses at
>> coalesce to make it magically go away.
>>
>> But now I feel like we have a much better understanding of why the
>> numbers need to be what they are, which is great.
>>
>> So, thanks for all the input and helping me understand what's going on.
>>
>> It'd be great to see some of the optimizations to BlockManager happen,
>> but I understand in the end why it needs to track what it does. And I
>> was also admittedly using a small cluster anyway.
>>
>> - Stephen
>>
>>
>

Re: oome from blockmanager

Posted by Aaron Davidson <il...@gmail.com>.
Great! Glad to hear it worked out. Spark definitely has a pain point about
deciding the right number of partitions, and I think we're going to be
spending a lot of time trying to reduce that issue.

Currently working on the patch to reduce the shuffle file block overheads,
but in the meantime, you can set "spark.shuffle.consolidateFiles=false" to
exchange OOMEs due to too many partitions for worse performance (probably
an acceptable tradeoff).



On Mon, Oct 28, 2013 at 2:31 PM, Stephen Haberman <
stephen.haberman@gmail.com> wrote:

> Hey guys,
>
> As a follow up, I raised our target partition size to 600mb (up from
> 64mb), which split this report's 500gb of tiny S3 files into ~700
> partitions, and everything ran much smoother.
>
> In retrospect, this was the same issue we'd ran into before, having too
> many partitions, and had previously solved by throwing some guesses at
> coalesce to make it magically go away.
>
> But now I feel like we have a much better understanding of why the
> numbers need to be what they are, which is great.
>
> So, thanks for all the input and helping me understand what's going on.
>
> It'd be great to see some of the optimizations to BlockManager happen,
> but I understand in the end why it needs to track what it does. And I
> was also admittedly using a small cluster anyway.
>
> - Stephen
>
>

Re: oome from blockmanager

Posted by Stephen Haberman <st...@gmail.com>.
Hey guys,

As a follow up, I raised our target partition size to 600mb (up from
64mb), which split this report's 500gb of tiny S3 files into ~700
partitions, and everything ran much smoother.

In retrospect, this was the same issue we'd ran into before, having too
many partitions, and had previously solved by throwing some guesses at
coalesce to make it magically go away.

But now I feel like we have a much better understanding of why the
numbers need to be what they are, which is great.

So, thanks for all the input and helping me understand what's going on.

It'd be great to see some of the optimizations to BlockManager happen,
but I understand in the end why it needs to track what it does. And I
was also admittedly using a small cluster anyway.

- Stephen


Re: oome from blockmanager

Posted by Stephen Haberman <st...@gmail.com>.
Hi Aaron,

> You're precisely correct about your (N*M)/(# machines) shuffle blocks
> per machine. I believe the 5.5 million data structures instead of 9.8
> comes from the fact that the shuffle was only around 50% of the way
> through before it blew up.

Cool, that sounds right. I like it when numbers match up, as it means my mental
might not be horribly wrong. :-)

> However, Spark in general takes the stance that if a partition doesn't fit in
> memory, things may blow up.

Ah, okay. I knew that was the case before, just wasn't sure if it was loosened.

Currently, we have to do some gyrations for this...like if a report wants to
load N files from S3 into an RDD, we total up the size, divide by our desired
partition size (64mb, which is Hadoop's IIRC), and then coalesce on that.

So, that's how we got 7,000 partitions for a month of data (500,000mb / 64mb =
7k partitions). (Without the coalesce, we have lots of tiny log files, so our
number of partitions shot way, way up, which, yeah, was blowing up.)

And, if we were to set spark.default.parallelism to, say, number of machines *
5, so 25 in this case, that would drop down to just 25 partitions, so, in the
naive case where we have all 500gb of data still in the RDD, that'd be 20gb per
partition.

Granted, we could set spark.default.parallelism higher, but it seems hard to
find the right value for a global config variable given that each cogroup will
have a different amount data/existing partitions. That's why we've avoided it so
far, and I guess have just gotten lucky that we've used big enough cluster sizes
to not notice the M*N blow up. (We had also run into Spark slowing down in the
default parallelism was too high--lots of really tiny tasks IIRC.)

Well, darn. I was going to be really excited if Spark could stream RDDs...I had
assumed shuffling was the biggest/only thing that assumed in-memory partitions.

I guess we could bump up our target partition size from 64mb to 500mb or
1gb...at the time, we were getting a lot of partition wonkiness (OOMEs/etc.)
that it seemed other people weren't getting with Spark, and I attributed this to
most people reading data pre-partitioned from HDFS, while all of our data always
comes in via S3 (in tiny gzip files). So I thought matching HDFS
partitions as close as possible would be the safest thing to do.

Thanks for the input--I'll mull over what we should do, and for now try a higher
goal partition size. Any other insights are appreciated.

- Stephen


Re: oome from blockmanager

Posted by Aaron Davidson <il...@gmail.com>.
You're precisely correct about your (N*M)/(# machines) shuffle blocks per
machine. I believe the 5.5 million data structures instead of 9.8 comes
from the fact that the shuffle was only around 50% of the way through
before it blew up.

Technically, the ShuffleMapTask should not require buffering the whole
partition (it does stream it, but compression algorithms like LZF do
significant buffering as you know). However, Spark in general takes the
stance that if a partition doesn't fit in memory, things may blow up. So if
the ShuffleMapTask itself isn't the cause of the OOM, too-large partitions
may cause a preceding or following RDD operation to do so anyway.



On Sat, Oct 26, 2013 at 3:41 PM, Stephen Haberman <
stephen.haberman@gmail.com> wrote:

>
> Thanks for all the help, guys.
>
> > When you do a shuffle form N map partitions to M reduce partitions,
> there are
> > N * M output blocks created and each one is tracked.
>
> Okay, that makes sense. I have a few questions then...
>
> If there are N*M output blocks, does that mean that each machine will
> (generally) be responsible for (N*M)/(number of machines) blocks, and so
> the
> BlockManager data structures would have appropriately less data with more
> machines?
>
> (Turns out 7,000*7,000/5=9.8 million which is in the ballpark of the
> estimated 5 million or so entries that were in BlockManager heap dump.)
>
> > Since you only have a few machines, you don't "need" the
> > extra partitions to add more parallelism to the reduce.
>
> True, I am perhaps overly cautious about favoring more partitions...
>
> It seems like previously, when Spark was shuffling a partition in the
> ShuffleMapTask, it buffered all the data in memory. So, even if you
> only had 5 machines, it was important to have lots of tiny slices of
> data, rather than a few big ones, to avoid OOMEs.
>
> ...but, I had forgotten this, but I believe that's no longer the case?
> And Spark/ShuffleMapTask can now fully stream partitions?
>
> ...if so, that seems like a big deal and that one of my first patches
> to have default partitioner prefer max partitions no longer makes as
> much sense, and spark.default.parallelism just became a whole lot more
> useful. As in, it should always be set. (We're not, currently.)
>
> I'll give this another go later tonight/tomorrow with less partitions
> and see what happens.
>
> Thanks again!
>
> - Stephen
>
>

Re: oome from blockmanager

Posted by Stephen Haberman <st...@gmail.com>.
Thanks for all the help, guys.

> When you do a shuffle form N map partitions to M reduce partitions, there are
> N * M output blocks created and each one is tracked.

Okay, that makes sense. I have a few questions then...

If there are N*M output blocks, does that mean that each machine will
(generally) be responsible for (N*M)/(number of machines) blocks, and so the
BlockManager data structures would have appropriately less data with more
machines?

(Turns out 7,000*7,000/5=9.8 million which is in the ballpark of the
estimated 5 million or so entries that were in BlockManager heap dump.)

> Since you only have a few machines, you don't "need" the
> extra partitions to add more parallelism to the reduce.

True, I am perhaps overly cautious about favoring more partitions...

It seems like previously, when Spark was shuffling a partition in the
ShuffleMapTask, it buffered all the data in memory. So, even if you
only had 5 machines, it was important to have lots of tiny slices of
data, rather than a few big ones, to avoid OOMEs.

...but, I had forgotten this, but I believe that's no longer the case?
And Spark/ShuffleMapTask can now fully stream partitions?

...if so, that seems like a big deal and that one of my first patches
to have default partitioner prefer max partitions no longer makes as
much sense, and spark.default.parallelism just became a whole lot more
useful. As in, it should always be set. (We're not, currently.)

I'll give this another go later tonight/tomorrow with less partitions
and see what happens.

Thanks again!

- Stephen


Re: oome from blockmanager

Posted by Stephen Haberman <st...@gmail.com>.
Hi Aaron,

> Clearly either of the latter two solutions [no compression or 1 executor/node]
> will produce a significant slowdown

Just curious, but why would turning off compression lead to a
significant slow down? Just more IO, I guess?

FWIW, the job we'd been discussing with 18k partitions, I tried with a
512mb goal partition size, which led to 2k partitions, and it completed
just fine.

Unfortunately, I tried the 512mb goal partition size with one of our
production jobs, and it blew up with an OOME in the AppendOnlyMap
reducer side of things, which I'm pretty sure means the partition was
now too big to have 2 of them fit in memory. I don't have a heap dump
for that one, but could get one fairly easily.

I'll discuss this with a few guys here; thinking we'll either try no
compression, or move to just using m1.xlarges.

> I am currently investigating shuffle file performance, and thanks to
> your feedback here, I'll additionally investigate the memory
> overheads inherent in shuffling as well.

Cool. Sounds great. Thanks for all the help, I appreciate it.

- Stephen


Re: oome from blockmanager

Posted by Aaron Davidson <il...@gmail.com>.
Thanks for the update, and glad to hear things are working well!


On Sun, Dec 1, 2013 at 1:56 PM, Stephen Haberman <stephen.haberman@gmail.com
> wrote:

>
> > The short term solutions have already been discussed: decrease the
> > number of reducers (and mappers, if you need them to be tied) or
> > potentially turn off compression if Snappy is holding too much buffer
> > space.
>
> Just to follow up with this (sorry for the delay; I was busy/out for
> Thanksgiving), but after chatting about it, we've moved from m1.larges
> to m1.xlarges, and nudged our partition size up (and so partition
> number down) and things are going quite well now.
>
> So, if we had a 40-machine m1.large job, we're now running it with 20
> m1.xlarges, and as expected it takes basically the same time/cost
> (since m1.xlarges have basically twice the resources), which is great.
>
> I'm still tempted to play around with turning off compression, but this is
> no longer a squeaky wheel, so we probably won't actively investigate it.
>
> Thanks for all the help, Aaron/Patrick/the list, we really appreciate
> it.
>
> - Stephen
>
>
>
>

Re: oome from blockmanager

Posted by Stephen Haberman <st...@gmail.com>.
> The short term solutions have already been discussed: decrease the
> number of reducers (and mappers, if you need them to be tied) or
> potentially turn off compression if Snappy is holding too much buffer
> space.

Just to follow up with this (sorry for the delay; I was busy/out for
Thanksgiving), but after chatting about it, we've moved from m1.larges
to m1.xlarges, and nudged our partition size up (and so partition
number down) and things are going quite well now.

So, if we had a 40-machine m1.large job, we're now running it with 20
m1.xlarges, and as expected it takes basically the same time/cost
(since m1.xlarges have basically twice the resources), which is great.

I'm still tempted to play around with turning off compression, but this is
no longer a squeaky wheel, so we probably won't actively investigate it.

Thanks for all the help, Aaron/Patrick/the list, we really appreciate
it.

- Stephen




Re: oome from blockmanager

Posted by 邵赛赛 <sa...@gmail.com>.
Hi Aaron, I've noticed that this is a spark configuration. When I ran the
app I use the default settings, which means buffer is 100KB and compression
is LZO.

Theoretically 4.8GB memory will cost for FastBufferedOutputStream, what I
saw from the Jvisualvm's memory sampler is nearly twice than 4.8G memory
allocated on byte array and hardly be gc-ed. Also when data size increases
these byte arrays' total usage will reach to about 19G. So I'm gussesing
some other place will also allocate byte array like compressed output
stream buffer.

I have dumped the heap at that moment and will keep investigating the
memory consumption. I will also try to reduce the buffer size or turn off
compression on  shuffle output.

Thanks for your advice.
Jerry
在 2013年11月23日 上午6:22,"Aaron Davidson" <il...@gmail.com>写道:

> Jerry, I need to correct what I said about the 100KB for
> each FastBufferedOutputStream -- this is actually a Spark buffer, not a
> compression buffer. The size can be configured using the
> "spark.shuffle.file.buffer.kb" System property, and it defaults to 100. I
> am still curious if you're using compression or seeing more than 48k
> DiskBlockObjectWriters to account for the remaining memory used.
>
>
> On Fri, Nov 22, 2013 at 9:05 AM, Aaron Davidson <il...@gmail.com>wrote:
>
>> Great, thanks for the feedback. It sounds like you're using the LZF
>> compression scheme -- switching to Snappy should see significantly less
>> buffer space used up per DiskBlockObjectWriter, but this doesn't really
>> solve the underlying problem. In general I've been thinking of "Spark
>> nodes" as having high memory and a moderate number of cores, but with 24
>> cores and 40GB of memory, each core really doesn't get that much memory
>> individually, despite every one needing its own set of
>> DiskBlockObjectWriters.
>>
>> One thing that is a little odd is that with your numbers, you should have
>> 2000 (reducers) * 24 (cores) = 48k DiskBlockObjectWriters. These should
>> only require a total of 4.8GB for the entire node, though, rather than 80%
>> of your JVM memory. Were you seeing significantly more than 48k
>> DiskBlockObjectWriters?
>>
>>
>> On Fri, Nov 22, 2013 at 1:38 AM, Shao, Saisai <sa...@intel.com>wrote:
>>
>>>  Hi Aaron,
>>>
>>>
>>>
>>> I’ve also met the same problem that shuffle takes so much overhead for
>>> large number of partitions. I think it is an important issue when
>>> processing large data.
>>>
>>>
>>>
>>> In my case I have 2000 mapper and 2000 reducers,  I dump the memory of
>>> executor and found that byte array takes about 80% of total jvm memory,
>>>  which are referred by FastBufferedOutputStream, and created by
>>> DiskBlockObjectWriter. It seems that there are so many instances of
>>> DiskBlockObjectWriter and each DiskBlockObjectWriter will has 100KB buffer
>>> for FastBufferedOutputStream by default. These buffers are persisted
>>> through task execution period and cannot be garbage collected unless task
>>> is finished.
>>>
>>>
>>>
>>> My cluster has 6 nodes, and 40G memory and 24 core per node, I tried
>>> with 5000 partitions, this will easily got OOM.
>>>
>>>
>>>
>>> What a dilemma is that my application needs groupByKey transformation
>>> which requires small partitions size, but small partition size will lead to
>>> more partition numbers that also consumes lots of memory.
>>>
>>>
>>>
>>> Thanks
>>>
>>> Jerry
>>>
>>>
>>>
>>> *From:* Aaron Davidson [mailto:ilikerps@gmail.com]
>>> *Sent:* Friday, November 22, 2013 2:54 PM
>>> *To:* user@spark.incubator.apache.org
>>> *Subject:* Re: oome from blockmanager
>>>
>>>
>>>
>>> Thanks for your feedback; I think this is a very important issue on the
>>> usability front. One thing to consider is that at some data size, one
>>> simply needs larger or more nodes. m1.large is essentially the smallest ec2
>>> instance size that can run a Spark job of any reasonable size. That's not
>>> an excuse for an OOM, really -- one should generally just see (heavily)
>>> degraded performance instead of actually failing the job. Additionally, the
>>> number of open files scales with the number of reducers in Spark, rather
>>> than, say, Map Reduce, where each mapper only writes to one file, at the
>>> cost of later sorting the entire thing. This unfortunately means that
>>> adding nodes isn't really a full solution in your case, since each one
>>> would try to have 36k compressed output streams open.
>>>
>>>
>>>
>>> The short term solutions have already been discussed: decrease the
>>> number of reducers (and mappers, if you need them to be tied) or
>>> potentially turn off compression if Snappy is holding too much buffer
>>> space. A third option would actually be to decrease the number of executors
>>> per node to 1, since that would double the available memory and roughly
>>> halve the usage. Clearly either of the latter two solutions will produce a
>>> significant slowdown, while the first should keep the same or better
>>> performance. While Spark is good at handling a large number of partitions,
>>> there is still some cost to schedule every task, as well as to store and
>>> forward the metadata for every shuffle block (which grows with R * M), so
>>> the ideal partition size is one that fits exactly into memory without
>>> OOMing -- although this is of course an unrealistic situation to aim for.
>>>
>>>
>>>
>>> The longer term solutions include algorithms which degrade gracefully
>>> instead of OOMing (although this would be a solution for too-large
>>> partitions instead of too-little, where the metadata and buffering becomes
>>> the issue) and to potentially adopt a more Map-Reducey style of shuffling
>>> where we would only need to write to 1 file per executor at a time, with
>>> some significant processing and disk bandwidth cost. I am currently
>>> investigating shuffle file performance, and thanks to your feedback here,
>>> I'll additionally investigate the memory overheads inherent in shuffling as
>>> well.
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Nov 21, 2013 at 10:20 PM, Stephen Haberman <
>>> stephen.haberman@gmail.com> wrote:
>>>
>>>
>>> > More significant in shuffling data is the number of reducers
>>>
>>> Makes sense.
>>>
>>>
>>> > so the lower bound on the number of reducers is 1.1TB/8GB = 138
>>>
>>> This seems slightly optimistic. My math would be: m1.large = 7.5gb
>>> total, leave
>>> 2gb to OS/worker/etc., split 5.5gb between 2 executors = 2.75gb, plus
>>> say Spark
>>> will need 20% or so as metadata/overhead, so ~2gb actually available to
>>> each
>>> executor to put our working data in memory.
>>>
>>> But the 1.1tb of data is compressed, say with a 50% reduction. And we
>>> wrap a
>>> case class around each line to abstract away the parsing logic, and, as
>>> you say,
>>> Java instances will be a good deal bigger than the raw data they
>>> encapsulate.
>>> Maybe 3x bigger? So, 2gb / 2 / 3 = ~300mb of raw data that, once
>>> uncompressed
>>> and loaded as Java objects, would likely fit in RAM.
>>>
>>> 1.1tb/.3gb = 3666 reducers.
>>>
>>> Perhaps I'm being pessmistic, but an 8gb partition size seems way high.
>>> Are
>>> other Spark users really using partitions this large?
>>>
>>> I'll admit our current value of 64mb is probably way low. We had seen a
>>> lot of OOMEs when first using Spark, due to having too many partitions
>>> (one per file loaded from S3). When writing our "auto coalesce" logic,
>>> I didn't know a good partition size to shoot for, but had read that
>>> HDFS used 64mb blocks.
>>>
>>> I thought we'd get the most parity with regular Spark/HDFS users by
>>> using the same value, so that's what we went with. Perhaps this was
>>> a bad assumption?
>>>
>>>
>>> > So a key question for you is, how many reducers did you use in this
>>> > task?
>>>
>>> 18,000. Yes, I know that seems naive.
>>>
>>> As an explanation, we prefer for our reports to not have/require any
>>> manual partitioning hints from the programmer. Our theory is that, once
>>> the data is loaded and we make a good guessimiate about partitioning
>>> (which is handled by a utility library that knows our goal partition
>>> size), the report logic itself just shouldn't care.
>>>
>>> So, in this case, the report is just cogrouping the 18k partition RDD
>>> with
>>> another RDD, and since we don't have spark.default.parallelism set, the
>>> resulting RDD is also 18k partitions.
>>>
>>> To us, this seems like the only safe default behavior; if the map-side
>>> RDD was
>>> correctly partitioned into 18k, and any fewer partitions would (in
>>> theory) risk
>>> OOMEs, then the reduce-side RDD should have the same number of
>>> partitions,
>>> because it will have, for a cogroup, data from multiple RDDs, not just
>>> the
>>> biggest upstream RDD.
>>>
>>> We would like to avoid having the report hard-code partition size
>>> overrides into a few/all of it cogroup calls--how would the report know
>>> what value to hard code? What date range is it currently being ran for?
>>> How much data is really there for this run?
>>>
>>> Also, I'm generally cautious about dropping the number of partitions
>>> too low, because my impression is that Spark excels at/prefers lots of
>>> small tasks, since its architecture allows it to schedule/move/recover
>>> them quickly.
>>>
>>>
>>> > I'll also be very interested so see any heap dumps
>>>
>>> Sure! I followed up with Aaron offlist.
>>>
>>> - Stephen
>>>
>>>
>>>
>>
>>
>

Re: oome from blockmanager

Posted by Aaron Davidson <il...@gmail.com>.
Jerry, I need to correct what I said about the 100KB for
each FastBufferedOutputStream -- this is actually a Spark buffer, not a
compression buffer. The size can be configured using the
"spark.shuffle.file.buffer.kb" System property, and it defaults to 100. I
am still curious if you're using compression or seeing more than 48k
DiskBlockObjectWriters to account for the remaining memory used.


On Fri, Nov 22, 2013 at 9:05 AM, Aaron Davidson <il...@gmail.com> wrote:

> Great, thanks for the feedback. It sounds like you're using the LZF
> compression scheme -- switching to Snappy should see significantly less
> buffer space used up per DiskBlockObjectWriter, but this doesn't really
> solve the underlying problem. In general I've been thinking of "Spark
> nodes" as having high memory and a moderate number of cores, but with 24
> cores and 40GB of memory, each core really doesn't get that much memory
> individually, despite every one needing its own set of
> DiskBlockObjectWriters.
>
> One thing that is a little odd is that with your numbers, you should have
> 2000 (reducers) * 24 (cores) = 48k DiskBlockObjectWriters. These should
> only require a total of 4.8GB for the entire node, though, rather than 80%
> of your JVM memory. Were you seeing significantly more than 48k
> DiskBlockObjectWriters?
>
>
> On Fri, Nov 22, 2013 at 1:38 AM, Shao, Saisai <sa...@intel.com>wrote:
>
>>  Hi Aaron,
>>
>>
>>
>> I’ve also met the same problem that shuffle takes so much overhead for
>> large number of partitions. I think it is an important issue when
>> processing large data.
>>
>>
>>
>> In my case I have 2000 mapper and 2000 reducers,  I dump the memory of
>> executor and found that byte array takes about 80% of total jvm memory,
>>  which are referred by FastBufferedOutputStream, and created by
>> DiskBlockObjectWriter. It seems that there are so many instances of
>> DiskBlockObjectWriter and each DiskBlockObjectWriter will has 100KB buffer
>> for FastBufferedOutputStream by default. These buffers are persisted
>> through task execution period and cannot be garbage collected unless task
>> is finished.
>>
>>
>>
>> My cluster has 6 nodes, and 40G memory and 24 core per node, I tried with
>> 5000 partitions, this will easily got OOM.
>>
>>
>>
>> What a dilemma is that my application needs groupByKey transformation
>> which requires small partitions size, but small partition size will lead to
>> more partition numbers that also consumes lots of memory.
>>
>>
>>
>> Thanks
>>
>> Jerry
>>
>>
>>
>> *From:* Aaron Davidson [mailto:ilikerps@gmail.com]
>> *Sent:* Friday, November 22, 2013 2:54 PM
>> *To:* user@spark.incubator.apache.org
>> *Subject:* Re: oome from blockmanager
>>
>>
>>
>> Thanks for your feedback; I think this is a very important issue on the
>> usability front. One thing to consider is that at some data size, one
>> simply needs larger or more nodes. m1.large is essentially the smallest ec2
>> instance size that can run a Spark job of any reasonable size. That's not
>> an excuse for an OOM, really -- one should generally just see (heavily)
>> degraded performance instead of actually failing the job. Additionally, the
>> number of open files scales with the number of reducers in Spark, rather
>> than, say, Map Reduce, where each mapper only writes to one file, at the
>> cost of later sorting the entire thing. This unfortunately means that
>> adding nodes isn't really a full solution in your case, since each one
>> would try to have 36k compressed output streams open.
>>
>>
>>
>> The short term solutions have already been discussed: decrease the number
>> of reducers (and mappers, if you need them to be tied) or potentially turn
>> off compression if Snappy is holding too much buffer space. A third option
>> would actually be to decrease the number of executors per node to 1, since
>> that would double the available memory and roughly halve the usage. Clearly
>> either of the latter two solutions will produce a significant slowdown,
>> while the first should keep the same or better performance. While Spark is
>> good at handling a large number of partitions, there is still some cost to
>> schedule every task, as well as to store and forward the metadata for every
>> shuffle block (which grows with R * M), so the ideal partition size is one
>> that fits exactly into memory without OOMing -- although this is of course
>> an unrealistic situation to aim for.
>>
>>
>>
>> The longer term solutions include algorithms which degrade gracefully
>> instead of OOMing (although this would be a solution for too-large
>> partitions instead of too-little, where the metadata and buffering becomes
>> the issue) and to potentially adopt a more Map-Reducey style of shuffling
>> where we would only need to write to 1 file per executor at a time, with
>> some significant processing and disk bandwidth cost. I am currently
>> investigating shuffle file performance, and thanks to your feedback here,
>> I'll additionally investigate the memory overheads inherent in shuffling as
>> well.
>>
>>
>>
>>
>>
>> On Thu, Nov 21, 2013 at 10:20 PM, Stephen Haberman <
>> stephen.haberman@gmail.com> wrote:
>>
>>
>> > More significant in shuffling data is the number of reducers
>>
>> Makes sense.
>>
>>
>> > so the lower bound on the number of reducers is 1.1TB/8GB = 138
>>
>> This seems slightly optimistic. My math would be: m1.large = 7.5gb total,
>> leave
>> 2gb to OS/worker/etc., split 5.5gb between 2 executors = 2.75gb, plus say
>> Spark
>> will need 20% or so as metadata/overhead, so ~2gb actually available to
>> each
>> executor to put our working data in memory.
>>
>> But the 1.1tb of data is compressed, say with a 50% reduction. And we
>> wrap a
>> case class around each line to abstract away the parsing logic, and, as
>> you say,
>> Java instances will be a good deal bigger than the raw data they
>> encapsulate.
>> Maybe 3x bigger? So, 2gb / 2 / 3 = ~300mb of raw data that, once
>> uncompressed
>> and loaded as Java objects, would likely fit in RAM.
>>
>> 1.1tb/.3gb = 3666 reducers.
>>
>> Perhaps I'm being pessmistic, but an 8gb partition size seems way high.
>> Are
>> other Spark users really using partitions this large?
>>
>> I'll admit our current value of 64mb is probably way low. We had seen a
>> lot of OOMEs when first using Spark, due to having too many partitions
>> (one per file loaded from S3). When writing our "auto coalesce" logic,
>> I didn't know a good partition size to shoot for, but had read that
>> HDFS used 64mb blocks.
>>
>> I thought we'd get the most parity with regular Spark/HDFS users by
>> using the same value, so that's what we went with. Perhaps this was
>> a bad assumption?
>>
>>
>> > So a key question for you is, how many reducers did you use in this
>> > task?
>>
>> 18,000. Yes, I know that seems naive.
>>
>> As an explanation, we prefer for our reports to not have/require any
>> manual partitioning hints from the programmer. Our theory is that, once
>> the data is loaded and we make a good guessimiate about partitioning
>> (which is handled by a utility library that knows our goal partition
>> size), the report logic itself just shouldn't care.
>>
>> So, in this case, the report is just cogrouping the 18k partition RDD with
>> another RDD, and since we don't have spark.default.parallelism set, the
>> resulting RDD is also 18k partitions.
>>
>> To us, this seems like the only safe default behavior; if the map-side
>> RDD was
>> correctly partitioned into 18k, and any fewer partitions would (in
>> theory) risk
>> OOMEs, then the reduce-side RDD should have the same number of partitions,
>> because it will have, for a cogroup, data from multiple RDDs, not just the
>> biggest upstream RDD.
>>
>> We would like to avoid having the report hard-code partition size
>> overrides into a few/all of it cogroup calls--how would the report know
>> what value to hard code? What date range is it currently being ran for?
>> How much data is really there for this run?
>>
>> Also, I'm generally cautious about dropping the number of partitions
>> too low, because my impression is that Spark excels at/prefers lots of
>> small tasks, since its architecture allows it to schedule/move/recover
>> them quickly.
>>
>>
>> > I'll also be very interested so see any heap dumps
>>
>> Sure! I followed up with Aaron offlist.
>>
>> - Stephen
>>
>>
>>
>
>

Re: oome from blockmanager

Posted by Aaron Davidson <il...@gmail.com>.
Great, thanks for the feedback. It sounds like you're using the LZF
compression scheme -- switching to Snappy should see significantly less
buffer space used up per DiskBlockObjectWriter, but this doesn't really
solve the underlying problem. In general I've been thinking of "Spark
nodes" as having high memory and a moderate number of cores, but with 24
cores and 40GB of memory, each core really doesn't get that much memory
individually, despite every one needing its own set of
DiskBlockObjectWriters.

One thing that is a little odd is that with your numbers, you should have
2000 (reducers) * 24 (cores) = 48k DiskBlockObjectWriters. These should
only require a total of 4.8GB for the entire node, though, rather than 80%
of your JVM memory. Were you seeing significantly more than 48k
DiskBlockObjectWriters?


On Fri, Nov 22, 2013 at 1:38 AM, Shao, Saisai <sa...@intel.com> wrote:

>  Hi Aaron,
>
>
>
> I’ve also met the same problem that shuffle takes so much overhead for
> large number of partitions. I think it is an important issue when
> processing large data.
>
>
>
> In my case I have 2000 mapper and 2000 reducers,  I dump the memory of
> executor and found that byte array takes about 80% of total jvm memory,
>  which are referred by FastBufferedOutputStream, and created by
> DiskBlockObjectWriter. It seems that there are so many instances of
> DiskBlockObjectWriter and each DiskBlockObjectWriter will has 100KB buffer
> for FastBufferedOutputStream by default. These buffers are persisted
> through task execution period and cannot be garbage collected unless task
> is finished.
>
>
>
> My cluster has 6 nodes, and 40G memory and 24 core per node, I tried with
> 5000 partitions, this will easily got OOM.
>
>
>
> What a dilemma is that my application needs groupByKey transformation
> which requires small partitions size, but small partition size will lead to
> more partition numbers that also consumes lots of memory.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Aaron Davidson [mailto:ilikerps@gmail.com]
> *Sent:* Friday, November 22, 2013 2:54 PM
> *To:* user@spark.incubator.apache.org
> *Subject:* Re: oome from blockmanager
>
>
>
> Thanks for your feedback; I think this is a very important issue on the
> usability front. One thing to consider is that at some data size, one
> simply needs larger or more nodes. m1.large is essentially the smallest ec2
> instance size that can run a Spark job of any reasonable size. That's not
> an excuse for an OOM, really -- one should generally just see (heavily)
> degraded performance instead of actually failing the job. Additionally, the
> number of open files scales with the number of reducers in Spark, rather
> than, say, Map Reduce, where each mapper only writes to one file, at the
> cost of later sorting the entire thing. This unfortunately means that
> adding nodes isn't really a full solution in your case, since each one
> would try to have 36k compressed output streams open.
>
>
>
> The short term solutions have already been discussed: decrease the number
> of reducers (and mappers, if you need them to be tied) or potentially turn
> off compression if Snappy is holding too much buffer space. A third option
> would actually be to decrease the number of executors per node to 1, since
> that would double the available memory and roughly halve the usage. Clearly
> either of the latter two solutions will produce a significant slowdown,
> while the first should keep the same or better performance. While Spark is
> good at handling a large number of partitions, there is still some cost to
> schedule every task, as well as to store and forward the metadata for every
> shuffle block (which grows with R * M), so the ideal partition size is one
> that fits exactly into memory without OOMing -- although this is of course
> an unrealistic situation to aim for.
>
>
>
> The longer term solutions include algorithms which degrade gracefully
> instead of OOMing (although this would be a solution for too-large
> partitions instead of too-little, where the metadata and buffering becomes
> the issue) and to potentially adopt a more Map-Reducey style of shuffling
> where we would only need to write to 1 file per executor at a time, with
> some significant processing and disk bandwidth cost. I am currently
> investigating shuffle file performance, and thanks to your feedback here,
> I'll additionally investigate the memory overheads inherent in shuffling as
> well.
>
>
>
>
>
> On Thu, Nov 21, 2013 at 10:20 PM, Stephen Haberman <
> stephen.haberman@gmail.com> wrote:
>
>
> > More significant in shuffling data is the number of reducers
>
> Makes sense.
>
>
> > so the lower bound on the number of reducers is 1.1TB/8GB = 138
>
> This seems slightly optimistic. My math would be: m1.large = 7.5gb total,
> leave
> 2gb to OS/worker/etc., split 5.5gb between 2 executors = 2.75gb, plus say
> Spark
> will need 20% or so as metadata/overhead, so ~2gb actually available to
> each
> executor to put our working data in memory.
>
> But the 1.1tb of data is compressed, say with a 50% reduction. And we wrap
> a
> case class around each line to abstract away the parsing logic, and, as
> you say,
> Java instances will be a good deal bigger than the raw data they
> encapsulate.
> Maybe 3x bigger? So, 2gb / 2 / 3 = ~300mb of raw data that, once
> uncompressed
> and loaded as Java objects, would likely fit in RAM.
>
> 1.1tb/.3gb = 3666 reducers.
>
> Perhaps I'm being pessmistic, but an 8gb partition size seems way high. Are
> other Spark users really using partitions this large?
>
> I'll admit our current value of 64mb is probably way low. We had seen a
> lot of OOMEs when first using Spark, due to having too many partitions
> (one per file loaded from S3). When writing our "auto coalesce" logic,
> I didn't know a good partition size to shoot for, but had read that
> HDFS used 64mb blocks.
>
> I thought we'd get the most parity with regular Spark/HDFS users by
> using the same value, so that's what we went with. Perhaps this was
> a bad assumption?
>
>
> > So a key question for you is, how many reducers did you use in this
> > task?
>
> 18,000. Yes, I know that seems naive.
>
> As an explanation, we prefer for our reports to not have/require any
> manual partitioning hints from the programmer. Our theory is that, once
> the data is loaded and we make a good guessimiate about partitioning
> (which is handled by a utility library that knows our goal partition
> size), the report logic itself just shouldn't care.
>
> So, in this case, the report is just cogrouping the 18k partition RDD with
> another RDD, and since we don't have spark.default.parallelism set, the
> resulting RDD is also 18k partitions.
>
> To us, this seems like the only safe default behavior; if the map-side RDD
> was
> correctly partitioned into 18k, and any fewer partitions would (in theory)
> risk
> OOMEs, then the reduce-side RDD should have the same number of partitions,
> because it will have, for a cogroup, data from multiple RDDs, not just the
> biggest upstream RDD.
>
> We would like to avoid having the report hard-code partition size
> overrides into a few/all of it cogroup calls--how would the report know
> what value to hard code? What date range is it currently being ran for?
> How much data is really there for this run?
>
> Also, I'm generally cautious about dropping the number of partitions
> too low, because my impression is that Spark excels at/prefers lots of
> small tasks, since its architecture allows it to schedule/move/recover
> them quickly.
>
>
> > I'll also be very interested so see any heap dumps
>
> Sure! I followed up with Aaron offlist.
>
> - Stephen
>
>
>

RE: oome from blockmanager

Posted by "Shao, Saisai" <sa...@intel.com>.
Hi Aaron,

I've also met the same problem that shuffle takes so much overhead for large number of partitions. I think it is an important issue when processing large data.

In my case I have 2000 mapper and 2000 reducers,  I dump the memory of executor and found that byte array takes about 80% of total jvm memory,  which are referred by FastBufferedOutputStream, and created by DiskBlockObjectWriter. It seems that there are so many instances of DiskBlockObjectWriter and each DiskBlockObjectWriter will has 100KB buffer for FastBufferedOutputStream by default. These buffers are persisted through task execution period and cannot be garbage collected unless task is finished.

My cluster has 6 nodes, and 40G memory and 24 core per node, I tried with 5000 partitions, this will easily got OOM.

What a dilemma is that my application needs groupByKey transformation which requires small partitions size, but small partition size will lead to more partition numbers that also consumes lots of memory.

Thanks
Jerry

From: Aaron Davidson [mailto:ilikerps@gmail.com]
Sent: Friday, November 22, 2013 2:54 PM
To: user@spark.incubator.apache.org
Subject: Re: oome from blockmanager

Thanks for your feedback; I think this is a very important issue on the usability front. One thing to consider is that at some data size, one simply needs larger or more nodes. m1.large is essentially the smallest ec2 instance size that can run a Spark job of any reasonable size. That's not an excuse for an OOM, really -- one should generally just see (heavily) degraded performance instead of actually failing the job. Additionally, the number of open files scales with the number of reducers in Spark, rather than, say, Map Reduce, where each mapper only writes to one file, at the cost of later sorting the entire thing. This unfortunately means that adding nodes isn't really a full solution in your case, since each one would try to have 36k compressed output streams open.

The short term solutions have already been discussed: decrease the number of reducers (and mappers, if you need them to be tied) or potentially turn off compression if Snappy is holding too much buffer space. A third option would actually be to decrease the number of executors per node to 1, since that would double the available memory and roughly halve the usage. Clearly either of the latter two solutions will produce a significant slowdown, while the first should keep the same or better performance. While Spark is good at handling a large number of partitions, there is still some cost to schedule every task, as well as to store and forward the metadata for every shuffle block (which grows with R * M), so the ideal partition size is one that fits exactly into memory without OOMing -- although this is of course an unrealistic situation to aim for.

The longer term solutions include algorithms which degrade gracefully instead of OOMing (although this would be a solution for too-large partitions instead of too-little, where the metadata and buffering becomes the issue) and to potentially adopt a more Map-Reducey style of shuffling where we would only need to write to 1 file per executor at a time, with some significant processing and disk bandwidth cost. I am currently investigating shuffle file performance, and thanks to your feedback here, I'll additionally investigate the memory overheads inherent in shuffling as well.


On Thu, Nov 21, 2013 at 10:20 PM, Stephen Haberman <st...@gmail.com>> wrote:

> More significant in shuffling data is the number of reducers
Makes sense.

> so the lower bound on the number of reducers is 1.1TB/8GB = 138
This seems slightly optimistic. My math would be: m1.large = 7.5gb total, leave
2gb to OS/worker/etc., split 5.5gb between 2 executors = 2.75gb, plus say Spark
will need 20% or so as metadata/overhead, so ~2gb actually available to each
executor to put our working data in memory.

But the 1.1tb of data is compressed, say with a 50% reduction. And we wrap a
case class around each line to abstract away the parsing logic, and, as you say,
Java instances will be a good deal bigger than the raw data they encapsulate.
Maybe 3x bigger? So, 2gb / 2 / 3 = ~300mb of raw data that, once uncompressed
and loaded as Java objects, would likely fit in RAM.

1.1tb/.3gb = 3666 reducers.

Perhaps I'm being pessmistic, but an 8gb partition size seems way high. Are
other Spark users really using partitions this large?

I'll admit our current value of 64mb is probably way low. We had seen a
lot of OOMEs when first using Spark, due to having too many partitions
(one per file loaded from S3). When writing our "auto coalesce" logic,
I didn't know a good partition size to shoot for, but had read that
HDFS used 64mb blocks.

I thought we'd get the most parity with regular Spark/HDFS users by
using the same value, so that's what we went with. Perhaps this was
a bad assumption?

> So a key question for you is, how many reducers did you use in this
> task?
18,000. Yes, I know that seems naive.

As an explanation, we prefer for our reports to not have/require any
manual partitioning hints from the programmer. Our theory is that, once
the data is loaded and we make a good guessimiate about partitioning
(which is handled by a utility library that knows our goal partition
size), the report logic itself just shouldn't care.

So, in this case, the report is just cogrouping the 18k partition RDD with
another RDD, and since we don't have spark.default.parallelism set, the
resulting RDD is also 18k partitions.

To us, this seems like the only safe default behavior; if the map-side RDD was
correctly partitioned into 18k, and any fewer partitions would (in theory) risk
OOMEs, then the reduce-side RDD should have the same number of partitions,
because it will have, for a cogroup, data from multiple RDDs, not just the
biggest upstream RDD.

We would like to avoid having the report hard-code partition size
overrides into a few/all of it cogroup calls--how would the report know
what value to hard code? What date range is it currently being ran for?
How much data is really there for this run?

Also, I'm generally cautious about dropping the number of partitions
too low, because my impression is that Spark excels at/prefers lots of
small tasks, since its architecture allows it to schedule/move/recover
them quickly.

> I'll also be very interested so see any heap dumps
Sure! I followed up with Aaron offlist.

- Stephen


Re: oome from blockmanager

Posted by Aaron Davidson <il...@gmail.com>.
Thanks for your feedback; I think this is a very important issue on the
usability front. One thing to consider is that at some data size, one
simply needs larger or more nodes. m1.large is essentially the smallest ec2
instance size that can run a Spark job of any reasonable size. That's not
an excuse for an OOM, really -- one should generally just see (heavily)
degraded performance instead of actually failing the job. Additionally, the
number of open files scales with the number of reducers in Spark, rather
than, say, Map Reduce, where each mapper only writes to one file, at the
cost of later sorting the entire thing. This unfortunately means that
adding nodes isn't really a full solution in your case, since each one
would try to have 36k compressed output streams open.

The short term solutions have already been discussed: decrease the number
of reducers (and mappers, if you need them to be tied) or potentially turn
off compression if Snappy is holding too much buffer space. A third option
would actually be to decrease the number of executors per node to 1, since
that would double the available memory and roughly halve the usage. Clearly
either of the latter two solutions will produce a significant slowdown,
while the first should keep the same or better performance. While Spark is
good at handling a large number of partitions, there is still some cost to
schedule every task, as well as to store and forward the metadata for every
shuffle block (which grows with R * M), so the ideal partition size is one
that fits exactly into memory without OOMing -- although this is of course
an unrealistic situation to aim for.

The longer term solutions include algorithms which degrade gracefully
instead of OOMing (although this would be a solution for too-large
partitions instead of too-little, where the metadata and buffering becomes
the issue) and to potentially adopt a more Map-Reducey style of shuffling
where we would only need to write to 1 file per executor at a time, with
some significant processing and disk bandwidth cost. I am currently
investigating shuffle file performance, and thanks to your feedback here,
I'll additionally investigate the memory overheads inherent in shuffling as
well.



On Thu, Nov 21, 2013 at 10:20 PM, Stephen Haberman <
stephen.haberman@gmail.com> wrote:

>
> > More significant in shuffling data is the number of reducers
>
> Makes sense.
>
> > so the lower bound on the number of reducers is 1.1TB/8GB = 138
>
> This seems slightly optimistic. My math would be: m1.large = 7.5gb total,
> leave
> 2gb to OS/worker/etc., split 5.5gb between 2 executors = 2.75gb, plus say
> Spark
> will need 20% or so as metadata/overhead, so ~2gb actually available to
> each
> executor to put our working data in memory.
>
> But the 1.1tb of data is compressed, say with a 50% reduction. And we wrap
> a
> case class around each line to abstract away the parsing logic, and, as
> you say,
> Java instances will be a good deal bigger than the raw data they
> encapsulate.
> Maybe 3x bigger? So, 2gb / 2 / 3 = ~300mb of raw data that, once
> uncompressed
> and loaded as Java objects, would likely fit in RAM.
>
> 1.1tb/.3gb = 3666 reducers.
>
> Perhaps I'm being pessmistic, but an 8gb partition size seems way high. Are
> other Spark users really using partitions this large?
>
> I'll admit our current value of 64mb is probably way low. We had seen a
> lot of OOMEs when first using Spark, due to having too many partitions
> (one per file loaded from S3). When writing our "auto coalesce" logic,
> I didn't know a good partition size to shoot for, but had read that
> HDFS used 64mb blocks.
>
> I thought we'd get the most parity with regular Spark/HDFS users by
> using the same value, so that's what we went with. Perhaps this was
> a bad assumption?
>
> > So a key question for you is, how many reducers did you use in this
> > task?
>
> 18,000. Yes, I know that seems naive.
>
> As an explanation, we prefer for our reports to not have/require any
> manual partitioning hints from the programmer. Our theory is that, once
> the data is loaded and we make a good guessimiate about partitioning
> (which is handled by a utility library that knows our goal partition
> size), the report logic itself just shouldn't care.
>
> So, in this case, the report is just cogrouping the 18k partition RDD with
> another RDD, and since we don't have spark.default.parallelism set, the
> resulting RDD is also 18k partitions.
>
> To us, this seems like the only safe default behavior; if the map-side RDD
> was
> correctly partitioned into 18k, and any fewer partitions would (in theory)
> risk
> OOMEs, then the reduce-side RDD should have the same number of partitions,
> because it will have, for a cogroup, data from multiple RDDs, not just the
> biggest upstream RDD.
>
> We would like to avoid having the report hard-code partition size
> overrides into a few/all of it cogroup calls--how would the report know
> what value to hard code? What date range is it currently being ran for?
> How much data is really there for this run?
>
> Also, I'm generally cautious about dropping the number of partitions
> too low, because my impression is that Spark excels at/prefers lots of
> small tasks, since its architecture allows it to schedule/move/recover
> them quickly.
>
> > I'll also be very interested so see any heap dumps
>
> Sure! I followed up with Aaron offlist.
>
> - Stephen
>
>

Re: oome from blockmanager

Posted by Stephen Haberman <st...@gmail.com>.
> More significant in shuffling data is the number of reducers

Makes sense.

> so the lower bound on the number of reducers is 1.1TB/8GB = 138

This seems slightly optimistic. My math would be: m1.large = 7.5gb total, leave
2gb to OS/worker/etc., split 5.5gb between 2 executors = 2.75gb, plus say Spark
will need 20% or so as metadata/overhead, so ~2gb actually available to each
executor to put our working data in memory.

But the 1.1tb of data is compressed, say with a 50% reduction. And we wrap a
case class around each line to abstract away the parsing logic, and, as you say,
Java instances will be a good deal bigger than the raw data they encapsulate.
Maybe 3x bigger? So, 2gb / 2 / 3 = ~300mb of raw data that, once uncompressed
and loaded as Java objects, would likely fit in RAM.

1.1tb/.3gb = 3666 reducers.

Perhaps I'm being pessmistic, but an 8gb partition size seems way high. Are
other Spark users really using partitions this large?

I'll admit our current value of 64mb is probably way low. We had seen a
lot of OOMEs when first using Spark, due to having too many partitions
(one per file loaded from S3). When writing our "auto coalesce" logic,
I didn't know a good partition size to shoot for, but had read that
HDFS used 64mb blocks.

I thought we'd get the most parity with regular Spark/HDFS users by
using the same value, so that's what we went with. Perhaps this was
a bad assumption?

> So a key question for you is, how many reducers did you use in this
> task?

18,000. Yes, I know that seems naive.

As an explanation, we prefer for our reports to not have/require any
manual partitioning hints from the programmer. Our theory is that, once
the data is loaded and we make a good guessimiate about partitioning
(which is handled by a utility library that knows our goal partition
size), the report logic itself just shouldn't care.

So, in this case, the report is just cogrouping the 18k partition RDD with
another RDD, and since we don't have spark.default.parallelism set, the
resulting RDD is also 18k partitions.

To us, this seems like the only safe default behavior; if the map-side RDD was
correctly partitioned into 18k, and any fewer partitions would (in theory) risk
OOMEs, then the reduce-side RDD should have the same number of partitions,
because it will have, for a cogroup, data from multiple RDDs, not just the
biggest upstream RDD.

We would like to avoid having the report hard-code partition size
overrides into a few/all of it cogroup calls--how would the report know
what value to hard code? What date range is it currently being ran for?
How much data is really there for this run?

Also, I'm generally cautious about dropping the number of partitions
too low, because my impression is that Spark excels at/prefers lots of
small tasks, since its architecture allows it to schedule/move/recover
them quickly.

> I'll also be very interested so see any heap dumps

Sure! I followed up with Aaron offlist.

- Stephen


Re: oome from blockmanager

Posted by Aaron Davidson <il...@gmail.com>.
There is distinction to be made between the number of incoming partitions
and the number of reducers. Let's say that the number of partitions is more
or less irrelevant, especially since we don't directly control the number
of partitions in the input data set (as you pointed out). More significant
in shuffling data is the number of *reducers, *or output partitions for the
shuffle.

To be 100% clear on what I mean, a shuffle completely redistributes input
partitions to output partitions, so you might start with 67k incoming
partitions, but if you have 10k reducers, you'll end up with 10k partitions
after the reduce. Every output partition should fit in memory since some of
Spark's reducing operations require the partition to fit in memory.
However, you can fully tune the number of reducers such that each output
partition fits into memory.

In your particular case, let's say you expect around 1.1TB to make it to
the reducers, and additionally that you have 8GB of RAM per core of the
executor. We need to ensure each output partition fits in memory, so the
lower bound on the number of reducers is 1.1TB/8GB = 138. Any fewer than
that and we would expect the reducers to OOM. Additionally, since we expect
some, possibly very significant, Java overhead, we may want 2-4 times as
many reducers to ensure they fit in memory.

While we now have a lower bound on the number of reducers, it also turns
out that we want to minimize the number of reducers, because each reducer
has significant metadata associated with it. In particular, the total
amount of metadata per machine for a shuffle is approximately 8*M*R/N bytes
for M mappers, R reducers, and N machines. So if we used 67k *reducers* as
well as incoming partitions (and 5 machines), we'd expect the metadata to
take 8*67k*67k/5 = 7.2 GB. If, however, we used only 1000 reducers (which
is 7x the lower bound), we'd expect the metadata to take only 100 MB per
machine.

This balancing act for the number of reducers is not fun. Ideally, we
wouldn't be so sensitive to your decisions on the number of partitions and
reducers, but this is future work.

So a key question for you is, how many reducers did you use in this task?
I'll also be very interested so see any heap dumps, as it's quite possible
a bug is causing more memory to be used somewhere than expected, so I'd
like to corroborate what you're seeing with what we expect.


On Thu, Nov 21, 2013 at 1:00 PM, Stephen Haberman <
stephen.haberman@gmail.com> wrote:

> Hi Patrick/Aaron,
>
> Sorry to revive this thread, but we're seeing some OOMEs errors again
> (running with master a few commits after Aaron's optimizations). I
> can tweak our job, but I just wanted to ask some clarifications.
>
> > Another way to fix it is to modify your job to create fewer
> > partitions.
>
> So, I get the impression that most people are not having OOMEs like we
> are...why is that? Do we really have significantly more partitions than
> most people use?
>
> When Spark loads data directly from Hadoop/HDFS (which we don't do),
> AFAIK the default partition size is 64mb, which surely results in a
> large number of partitions (>10k?) for many data sets?
>
> When this job loads from S3, 1 month of data was originally 67,000
> files (1 file per partition), but then we have a routine that coalesces
> it down to ~64mb partitions, which for this job meant 18,000 partitions.
>
> 18k partitions * 64mb/partition = ~1.1tb, which matches how much data
> is in S3.
>
> How many partitions would usually be a good idea for 1tb of data?
>
> I was generally under the impression, what with the Sparrow/etc. slides
> I had come across, that smaller partition sizes were better
> scheduled/retried/etc. anyway.
>
> So, yeah, I can manually partition this job down to a lower number, or
> adjust our auto-coalescing to shoot for larger partitions, but I was
> just hoping to get some feedback about what the Spark team considers an
> acceptable number of partitions, both in general and for a ~1tb size
> RDD.
>
> I'll send a separate email with the specifics of the OOME/heap dump.
>
> Thanks!
>
> - Stephen
>
>

Re: oome from blockmanager

Posted by Stephen Haberman <st...@gmail.com>.
Hi Patrick/Aaron,

Sorry to revive this thread, but we're seeing some OOMEs errors again
(running with master a few commits after Aaron's optimizations). I
can tweak our job, but I just wanted to ask some clarifications.

> Another way to fix it is to modify your job to create fewer 
> partitions.

So, I get the impression that most people are not having OOMEs like we
are...why is that? Do we really have significantly more partitions than
most people use?

When Spark loads data directly from Hadoop/HDFS (which we don't do),
AFAIK the default partition size is 64mb, which surely results in a
large number of partitions (>10k?) for many data sets?

When this job loads from S3, 1 month of data was originally 67,000
files (1 file per partition), but then we have a routine that coalesces
it down to ~64mb partitions, which for this job meant 18,000 partitions.

18k partitions * 64mb/partition = ~1.1tb, which matches how much data
is in S3.

How many partitions would usually be a good idea for 1tb of data?

I was generally under the impression, what with the Sparrow/etc. slides
I had come across, that smaller partition sizes were better
scheduled/retried/etc. anyway.

So, yeah, I can manually partition this job down to a lower number, or
adjust our auto-coalescing to shoot for larger partitions, but I was
just hoping to get some feedback about what the Spark team considers an
acceptable number of partitions, both in general and for a ~1tb size
RDD.

I'll send a separate email with the specifics of the OOME/heap dump.

Thanks!

- Stephen


Re: oome from blockmanager

Posted by Patrick Wendell <pw...@gmail.com>.
Also just to clarify my answer. We definitely need to do the mentioned
optimizations because it can definitely happen that you legitimately need
thousands of mappers and reduce splits on each machine.

I was just suggesting to also play around with the partitions in this case,
since the dataset may not need so many partitions.


On Sat, Oct 26, 2013 at 2:33 PM, Patrick Wendell <pw...@gmail.com> wrote:

> Hey Stephen,
>
> The issue is this. When you do a shuffle form N map partitions to M reduce
> partitions, there are N * M output blocks created and each one is tracked.
> That's why all these per-block overheads are causing you to OOM.
>
> One way to fix this is to reduce the per-block overhead which Josh and
> Aaron are discussing. It actually used to be even worse, we used to also
> create N * M files, but Aaron recently submitted a patch to decrease the
> number of files.
>
> Another way to fix it is to modify your job to create fewer partitions.
> Try coalesce'ing the input into fewer partitions before you shuffle. For
> instance, call coalesce(100) on the input before (or play with different
> values here). You should also try explicitly setting fewer partitions on
> the reduce side as well when you do the cogroup. Since you only have a few
> machines, you don't "need" the extra partitions to add more parallelism to
> the reduce. So by playing with the map and/or reduce partitions you can
> probably fix this.
>
> At least, I'm interested in hearing how that works!
>
> In general, people usually don't hit this, because it's not so common to
> have thousands of reducers *per machine* (i.e. most machines only have a
> few cores anyway).
>
> - Patrick
>
>
>
>
> On Sat, Oct 26, 2013 at 1:54 PM, Aaron Davidson <il...@gmail.com>wrote:
>
>> Thanks again for the great detail! Your setup sounds correct, and all the
>> numbers you're seeing suggest that no foul play (i.e., bug) is at work. I
>> think this is a simple case of the blockInfo and blockToFileSegmentMap being
>> poorly optimized, especially the latter. I will look into reducing the
>> memory footprint of the blockToFileSegmentMap using similar techniques as
>> Josh mentioned for the blockInfo map. I've created a jira (SPARK-946<https://spark-project.atlassian.net/browse/SPARK-946>)
>> to track this issue.
>>
>> In the meantime, 4GB of memory does seem really low, so more or
>> higher-memory machines is probably the only route to get your job running
>> successfully right now.
>>
>>
>> On Sat, Oct 26, 2013 at 1:13 PM, Stephen Haberman <
>> stephen.haberman@gmail.com> wrote:
>>
>>> Hi Patrick,
>>>
>>> > Just wondering, how many reducers are you using in this shuffle? By
>>> > 7,000 partitions, I'm assuming you mean the map side of the shuffle.
>>> > What about the reduce side?
>>>
>>> 7,000 on that side as well.
>>>
>>> We're loading about a month's worth of data in one RDD, with ~7,000
>>> partitions, and cogrouping it with another RDD with 50 partitions, and
>>> the resulting RDD also has 7,000 partitions.
>>>
>>> (As, since we don't have spark.default.parallelism set, the
>>> defaultPartitioner logic chooses the max of [50, 7,0000] to be the next
>>> partition size.)
>>>
>>> I believe that is what you're asking by number of reducers? The number
>>> of partitions in the post-cogroup ShuffledRDD?
>>>
>>> Also, AFAICT, I don't believe we get to the reduce/ShuffledRDD side of
>>> this cogroup--after 2,000-3,000 ShuffleMapTasks on the map side is when
>>> it bogs down.
>>>
>>> Thanks,
>>> Stephen
>>>
>>>
>>
>

Re: oome from blockmanager

Posted by Patrick Wendell <pw...@gmail.com>.
Hey Stephen,

The issue is this. When you do a shuffle form N map partitions to M reduce
partitions, there are N * M output blocks created and each one is tracked.
That's why all these per-block overheads are causing you to OOM.

One way to fix this is to reduce the per-block overhead which Josh and
Aaron are discussing. It actually used to be even worse, we used to also
create N * M files, but Aaron recently submitted a patch to decrease the
number of files.

Another way to fix it is to modify your job to create fewer partitions. Try
coalesce'ing the input into fewer partitions before you shuffle. For
instance, call coalesce(100) on the input before (or play with different
values here). You should also try explicitly setting fewer partitions on
the reduce side as well when you do the cogroup. Since you only have a few
machines, you don't "need" the extra partitions to add more parallelism to
the reduce. So by playing with the map and/or reduce partitions you can
probably fix this.

At least, I'm interested in hearing how that works!

In general, people usually don't hit this, because it's not so common to
have thousands of reducers *per machine* (i.e. most machines only have a
few cores anyway).

- Patrick




On Sat, Oct 26, 2013 at 1:54 PM, Aaron Davidson <il...@gmail.com> wrote:

> Thanks again for the great detail! Your setup sounds correct, and all the
> numbers you're seeing suggest that no foul play (i.e., bug) is at work. I
> think this is a simple case of the blockInfo and blockToFileSegmentMap being
> poorly optimized, especially the latter. I will look into reducing the
> memory footprint of the blockToFileSegmentMap using similar techniques as
> Josh mentioned for the blockInfo map. I've created a jira (SPARK-946<https://spark-project.atlassian.net/browse/SPARK-946>)
> to track this issue.
>
> In the meantime, 4GB of memory does seem really low, so more or
> higher-memory machines is probably the only route to get your job running
> successfully right now.
>
>
> On Sat, Oct 26, 2013 at 1:13 PM, Stephen Haberman <
> stephen.haberman@gmail.com> wrote:
>
>> Hi Patrick,
>>
>> > Just wondering, how many reducers are you using in this shuffle? By
>> > 7,000 partitions, I'm assuming you mean the map side of the shuffle.
>> > What about the reduce side?
>>
>> 7,000 on that side as well.
>>
>> We're loading about a month's worth of data in one RDD, with ~7,000
>> partitions, and cogrouping it with another RDD with 50 partitions, and
>> the resulting RDD also has 7,000 partitions.
>>
>> (As, since we don't have spark.default.parallelism set, the
>> defaultPartitioner logic chooses the max of [50, 7,0000] to be the next
>> partition size.)
>>
>> I believe that is what you're asking by number of reducers? The number
>> of partitions in the post-cogroup ShuffledRDD?
>>
>> Also, AFAICT, I don't believe we get to the reduce/ShuffledRDD side of
>> this cogroup--after 2,000-3,000 ShuffleMapTasks on the map side is when
>> it bogs down.
>>
>> Thanks,
>> Stephen
>>
>>
>

Re: oome from blockmanager

Posted by Aaron Davidson <il...@gmail.com>.
Thanks again for the great detail! Your setup sounds correct, and all the
numbers you're seeing suggest that no foul play (i.e., bug) is at work. I
think this is a simple case of the blockInfo and blockToFileSegmentMap being
poorly optimized, especially the latter. I will look into reducing the
memory footprint of the blockToFileSegmentMap using similar techniques as
Josh mentioned for the blockInfo map. I've created a jira
(SPARK-946<https://spark-project.atlassian.net/browse/SPARK-946>)
to track this issue.

In the meantime, 4GB of memory does seem really low, so more or
higher-memory machines is probably the only route to get your job running
successfully right now.


On Sat, Oct 26, 2013 at 1:13 PM, Stephen Haberman <
stephen.haberman@gmail.com> wrote:

> Hi Patrick,
>
> > Just wondering, how many reducers are you using in this shuffle? By
> > 7,000 partitions, I'm assuming you mean the map side of the shuffle.
> > What about the reduce side?
>
> 7,000 on that side as well.
>
> We're loading about a month's worth of data in one RDD, with ~7,000
> partitions, and cogrouping it with another RDD with 50 partitions, and
> the resulting RDD also has 7,000 partitions.
>
> (As, since we don't have spark.default.parallelism set, the
> defaultPartitioner logic chooses the max of [50, 7,0000] to be the next
> partition size.)
>
> I believe that is what you're asking by number of reducers? The number
> of partitions in the post-cogroup ShuffledRDD?
>
> Also, AFAICT, I don't believe we get to the reduce/ShuffledRDD side of
> this cogroup--after 2,000-3,000 ShuffleMapTasks on the map side is when
> it bogs down.
>
> Thanks,
> Stephen
>
>

Re: oome from blockmanager

Posted by Stephen Haberman <st...@gmail.com>.
Hi Patrick,

> Just wondering, how many reducers are you using in this shuffle? By
> 7,000 partitions, I'm assuming you mean the map side of the shuffle.
> What about the reduce side?

7,000 on that side as well.

We're loading about a month's worth of data in one RDD, with ~7,000
partitions, and cogrouping it with another RDD with 50 partitions, and
the resulting RDD also has 7,000 partitions.

(As, since we don't have spark.default.parallelism set, the
defaultPartitioner logic chooses the max of [50, 7,0000] to be the next
partition size.)

I believe that is what you're asking by number of reducers? The number
of partitions in the post-cogroup ShuffledRDD?

Also, AFAICT, I don't believe we get to the reduce/ShuffledRDD side of
this cogroup--after 2,000-3,000 ShuffleMapTasks on the map side is when
it bogs down.

Thanks,
Stephen


Re: oome from blockmanager

Posted by Patrick Wendell <pw...@gmail.com>.
Hey Stephen,

Just wondering, how many reducers are you using in this shuffle? By 7,000
partitions, I'm assuming you mean the map side of the shuffle. What about
the reduce side?

- Patrick


On Sat, Oct 26, 2013 at 11:43 AM, Stephen Haberman <
stephen.haberman@gmail.com> wrote:

> Hi,
>
> By dropping spark.shuffle.file.buffer.kb to 10k and using Snappy
> (thanks, Aaron), the job I'm trying to run is no longer OOMEing because
> of 300k LZF buffers taking up 4g of RAM.
>
> But...now it's OOMEing because BlockManager is taking ~3.5gb of RAM
> (which is ~90% of the available heap).
>
> Specifically, it's two ConcurrentHashMaps:
>
> * BlockManager.blockInfo has ~1gb retained, AFAICT from ~5.5 million
>   entries of (ShuffleBlockId, (BlockInfo, Long))
>
> * BlockManager's DiskBlockManager.blockToFileSegmentMap has ~2.3gb
>   retained, AFAICT from about the same ~5.5 million entries of
>   (ShuffleBlockId, (FileSegment, Long)).
>
> The job stalls about 3,000 tasks through a 7,000-partition shuffle that
> is loading ~500gb from S3 on 5 m1.large (4gb heap) machines. The job
> did a few smaller ~50-partition shuffles before this larger one, but
> nothing crazy. It's an on-demand/EMR cluster, in standalone mode.
>
> Both of these maps are TimeStampedHashMaps, which kind of makes me
> shudder, but we have the cleaner disabled which AFAIK is what we want,
> because we aren't running long-running streaming jobs. And AFAIU if the
> hash map did get cleaned up mid-shuffle, lookups would just start
> failing (which was actually happening for this job on Spark 0.7 and is
> what prompted us to get around to trying Spark 0.8).
>
> So, I haven't really figured out BlockManager yet--any hints on what we
> could do here? More machines? Should there really be this many entries
> in it for a shuffle of this size?
>
> I know 5 machines/4gb of RAM isn't a lot, and I could use more if
> needed, but I just expected the job to go slower, not OOME.
>
> Also, I technically have a heap dump from a m1.xlarge (~15gb of RAM)
> cluster that also OOMEd on the same job, but I can't open the file on
> my laptop, so I can't tell if it was OOMEing for this issue or another
> one (it was not using snappy, but using 10kb file buffers, so I'm
> interested to see what happened to it.)
>
> - Stephen
>
>