You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Kyle Ellrott <ke...@soe.ucsc.edu> on 2013/10/26 19:53:18 UTC

SPARK-942

I was wondering if anybody had any thoughts on the best way to tackle
SPARK-942 ( https://spark-project.atlassian.net/browse/SPARK-942 ).
Basically, Spark takes an iterator from a flatmap call and because I tell
it that it needs to persist Spark proceeds to push it all into an array
before deciding that it doesn't have enough memory and trying to serialize
it to disk, and somewhere along the line it runs out of memory. For my
particular operation, the function return an iterator that reads data out
of a file, and the size of the files passed to that function can vary
greatly (from a few kilobytes to a few gigabytes). The funny thing is that
if I do a strait 'map' operation after the flat map, everything works,
because Spark just passes the iterator forward and never tries to expand
the whole thing into memory. But I need do a reduceByKey across all the
records, so I'd like to persist to disk first, and that is where I hit this
snag.
I've already setup a unit test to replicate the problem, and I know the
area of the code that would need to be fixed.
I'm just hoping for some tips on the best way to fix the problem.

Kyle

Re: SPARK-942

Posted by Evan Chan <ev...@ooyala.com>.
+1 for IteratorWithSizeEstimate.

I believe today only HadoopRDDs are able to give fine grained
progress;  with an enhanced iterator interface (which can still expose
the base Iterator trait) we can extend the possibility of fine grained
progress to all RDDs that implement the enhanced iterator.

On Tue, Nov 12, 2013 at 11:07 AM, Stephen Haberman
<st...@gmail.com> wrote:
>
>> The problem is that the iterator interface only defines 'hasNext' and
>> 'next' methods.
>
> Just a comment from the peanut gallery, but FWIW it seems like being
> able to ask "how much data is here" would be a useful thing for Spark
> to know, even if that means moving away from Iterator itself, or
> something like IteratorWithSizeEstimate/something/something.
>
> Not only for this, but so that, ideally, Spark could basically do
> dynamic partitioning.
>
> E.g. when we load a month's worth of data, it's X GB, but after a few
> maps and filters, it's X/100 GB, so could use X/100 partitions instead.
>
> But right now all partitioning decisions are made up-front,
> via .coalesce/etc. type hints from the programmer, and it seems if
> Spark could delay making partitioning decisions each until RDD could
> like lazily-eval/sample a few lines (hand waving), that would be super
> sexy from our respective, in terms of doing automatic perf/partition
> optimization.
>
> Huge disclaimer that this is probably a big pita to implement, and
> could likely not be as worthwhile as I naively think it would be.
>
> - Stephen



-- 
--
Evan Chan
Staff Engineer
ev@ooyala.com  |

Re: SPARK-942

Posted by Aaron Davidson <il...@gmail.com>.
By the way, there are a few places one can look for logs while testing:
Unit test runner logs (should contain driver and worker
logs): core/target/unit-tests.log
Executor logs: work/app-*

This should help find the root exception when you see one caught by the
DAGScheduler, such as in this case.


On Tue, Nov 12, 2013 at 6:21 PM, Kyle Ellrott <ke...@soe.ucsc.edu> wrote:

> Sure, do you have a URL for your patch?
>
> Kyle
> On Nov 12, 2013 5:59 PM, "Xia, Junluan" <ju...@intel.com> wrote:
>
> > Hi kely
> >
> > I also build a patch for this issue, and pass the test, you could help me
> > to review if you are free.
> >
> > -----Original Message-----
> > From: Kyle Ellrott [mailto:kellrott@soe.ucsc.edu]
> > Sent: Wednesday, November 13, 2013 8:44 AM
> > To: dev@spark.incubator.apache.org
> > Subject: Re: SPARK-942
> >
> > I've posted a patch that I think produces the correct behavior at
> >
> >
> https://github.com/kellrott/incubator-spark/commit/efe1102c8a7436b2fe112d3bece9f35fedea0dc8
> >
> > It works fine on my programs, but if I run the unit tests, I get errors
> > like:
> >
> > [info] - large number of iterations *** FAILED ***
> > [info]   org.apache.spark.SparkException: Job aborted: Task 4.0:0 failed
> > more than 0 times; aborting job java.lang.ClassCastException:
> > scala.collection.immutable.StreamIterator cannot be cast to
> > scala.collection.mutable.ArrayBuffer
> > [info]   at
> >
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:818)
> > [info]   at
> >
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:816)
> > [info]   at
> >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
> > [info]   at
> > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> > [info]   at
> >
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:816)
> > [info]   at
> >
> >
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:431)
> > [info]   at org.apache.spark.scheduler.DAGScheduler.org
> > $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:493)
> > [info]   at
> >
> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:158)
> >
> >
> > I can't figure out the line number of where the original error occurred.
> > Or why I can't replicate them in my various test programs.
> > Any help would be appreciated.
> >
> > Kyle
> >
> >
> >
> >
> >
> >
> > On Tue, Nov 12, 2013 at 11:35 AM, Alex Boisvert <alex.boisvert@gmail.com
> > >wrote:
> >
> > > On Tue, Nov 12, 2013 at 11:07 AM, Stephen Haberman <
> > > stephen.haberman@gmail.com> wrote:
> > >
> > > > Huge disclaimer that this is probably a big pita to implement, and
> > > > could likely not be as worthwhile as I naively think it would be.
> > > >
> > >
> > > My perspective on this is it's already big pita of Spark users today.
> > >
> > > In the absence of explicit directions/hints, Spark should be able to
> > > make ballpark estimates and conservatively pick # of partitions,
> > > storage strategies (e.g., memory vs disk) and other runtime parameters
> > that fit the
> > > deployment architecture/capacities.   If this requires code and extra
> > > runtime resources for sampling/measuring data, guestimating job size,
> > > and so on, so be it.
> > >
> > > Users want working jobs first.  Optimal performance / resource
> > > utilization follow from that.
> > >
> >
>

RE: SPARK-942

Posted by Kyle Ellrott <ke...@soe.ucsc.edu>.
Sure, do you have a URL for your patch?

Kyle
On Nov 12, 2013 5:59 PM, "Xia, Junluan" <ju...@intel.com> wrote:

> Hi kely
>
> I also build a patch for this issue, and pass the test, you could help me
> to review if you are free.
>
> -----Original Message-----
> From: Kyle Ellrott [mailto:kellrott@soe.ucsc.edu]
> Sent: Wednesday, November 13, 2013 8:44 AM
> To: dev@spark.incubator.apache.org
> Subject: Re: SPARK-942
>
> I've posted a patch that I think produces the correct behavior at
>
> https://github.com/kellrott/incubator-spark/commit/efe1102c8a7436b2fe112d3bece9f35fedea0dc8
>
> It works fine on my programs, but if I run the unit tests, I get errors
> like:
>
> [info] - large number of iterations *** FAILED ***
> [info]   org.apache.spark.SparkException: Job aborted: Task 4.0:0 failed
> more than 0 times; aborting job java.lang.ClassCastException:
> scala.collection.immutable.StreamIterator cannot be cast to
> scala.collection.mutable.ArrayBuffer
> [info]   at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:818)
> [info]   at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:816)
> [info]   at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
> [info]   at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> [info]   at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:816)
> [info]   at
>
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:431)
> [info]   at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:493)
> [info]   at
> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:158)
>
>
> I can't figure out the line number of where the original error occurred.
> Or why I can't replicate them in my various test programs.
> Any help would be appreciated.
>
> Kyle
>
>
>
>
>
>
> On Tue, Nov 12, 2013 at 11:35 AM, Alex Boisvert <alex.boisvert@gmail.com
> >wrote:
>
> > On Tue, Nov 12, 2013 at 11:07 AM, Stephen Haberman <
> > stephen.haberman@gmail.com> wrote:
> >
> > > Huge disclaimer that this is probably a big pita to implement, and
> > > could likely not be as worthwhile as I naively think it would be.
> > >
> >
> > My perspective on this is it's already big pita of Spark users today.
> >
> > In the absence of explicit directions/hints, Spark should be able to
> > make ballpark estimates and conservatively pick # of partitions,
> > storage strategies (e.g., memory vs disk) and other runtime parameters
> that fit the
> > deployment architecture/capacities.   If this requires code and extra
> > runtime resources for sampling/measuring data, guestimating job size,
> > and so on, so be it.
> >
> > Users want working jobs first.  Optimal performance / resource
> > utilization follow from that.
> >
>

RE: SPARK-942

Posted by "Xia, Junluan" <ju...@intel.com>.
Hi kely 

I also build a patch for this issue, and pass the test, you could help me to review if you are free.

-----Original Message-----
From: Kyle Ellrott [mailto:kellrott@soe.ucsc.edu] 
Sent: Wednesday, November 13, 2013 8:44 AM
To: dev@spark.incubator.apache.org
Subject: Re: SPARK-942

I've posted a patch that I think produces the correct behavior at
https://github.com/kellrott/incubator-spark/commit/efe1102c8a7436b2fe112d3bece9f35fedea0dc8

It works fine on my programs, but if I run the unit tests, I get errors
like:

[info] - large number of iterations *** FAILED ***
[info]   org.apache.spark.SparkException: Job aborted: Task 4.0:0 failed
more than 0 times; aborting job java.lang.ClassCastException:
scala.collection.immutable.StreamIterator cannot be cast to scala.collection.mutable.ArrayBuffer
[info]   at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:818)
[info]   at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:816)
[info]   at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
[info]   at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
[info]   at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:816)
[info]   at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:431)
[info]   at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:493)
[info]   at
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:158)


I can't figure out the line number of where the original error occurred. Or why I can't replicate them in my various test programs.
Any help would be appreciated.

Kyle






On Tue, Nov 12, 2013 at 11:35 AM, Alex Boisvert <al...@gmail.com>wrote:

> On Tue, Nov 12, 2013 at 11:07 AM, Stephen Haberman < 
> stephen.haberman@gmail.com> wrote:
>
> > Huge disclaimer that this is probably a big pita to implement, and 
> > could likely not be as worthwhile as I naively think it would be.
> >
>
> My perspective on this is it's already big pita of Spark users today.
>
> In the absence of explicit directions/hints, Spark should be able to 
> make ballpark estimates and conservatively pick # of partitions, 
> storage strategies (e.g., memory vs disk) and other runtime parameters that fit the
> deployment architecture/capacities.   If this requires code and extra
> runtime resources for sampling/measuring data, guestimating job size, 
> and so on, so be it.
>
> Users want working jobs first.  Optimal performance / resource 
> utilization follow from that.
>

Re: SPARK-942

Posted by Kyle Ellrott <ke...@soe.ucsc.edu>.
I've posted a patch that I think produces the correct behavior at
https://github.com/kellrott/incubator-spark/commit/efe1102c8a7436b2fe112d3bece9f35fedea0dc8

It works fine on my programs, but if I run the unit tests, I get errors
like:

[info] - large number of iterations *** FAILED ***
[info]   org.apache.spark.SparkException: Job aborted: Task 4.0:0 failed
more than 0 times; aborting job java.lang.ClassCastException:
scala.collection.immutable.StreamIterator cannot be cast to
scala.collection.mutable.ArrayBuffer
[info]   at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:818)
[info]   at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:816)
[info]   at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
[info]   at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
[info]   at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:816)
[info]   at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:431)
[info]   at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:493)
[info]   at
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:158)


I can't figure out the line number of where the original error occurred. Or
why I can't replicate them in my various test programs.
Any help would be appreciated.

Kyle






On Tue, Nov 12, 2013 at 11:35 AM, Alex Boisvert <al...@gmail.com>wrote:

> On Tue, Nov 12, 2013 at 11:07 AM, Stephen Haberman <
> stephen.haberman@gmail.com> wrote:
>
> > Huge disclaimer that this is probably a big pita to implement, and
> > could likely not be as worthwhile as I naively think it would be.
> >
>
> My perspective on this is it's already big pita of Spark users today.
>
> In the absence of explicit directions/hints, Spark should be able to make
> ballpark estimates and conservatively pick # of partitions, storage
> strategies (e.g., memory vs disk) and other runtime parameters that fit the
> deployment architecture/capacities.   If this requires code and extra
> runtime resources for sampling/measuring data, guestimating job size, and
> so on, so be it.
>
> Users want working jobs first.  Optimal performance / resource utilization
> follow from that.
>

Re: SPARK-942

Posted by Alex Boisvert <al...@gmail.com>.
On Tue, Nov 12, 2013 at 11:07 AM, Stephen Haberman <
stephen.haberman@gmail.com> wrote:

> Huge disclaimer that this is probably a big pita to implement, and
> could likely not be as worthwhile as I naively think it would be.
>

My perspective on this is it's already big pita of Spark users today.

In the absence of explicit directions/hints, Spark should be able to make
ballpark estimates and conservatively pick # of partitions, storage
strategies (e.g., memory vs disk) and other runtime parameters that fit the
deployment architecture/capacities.   If this requires code and extra
runtime resources for sampling/measuring data, guestimating job size, and
so on, so be it.

Users want working jobs first.  Optimal performance / resource utilization
follow from that.

Re: SPARK-942

Posted by Stephen Haberman <st...@gmail.com>.
> The problem is that the iterator interface only defines 'hasNext' and
> 'next' methods.

Just a comment from the peanut gallery, but FWIW it seems like being
able to ask "how much data is here" would be a useful thing for Spark
to know, even if that means moving away from Iterator itself, or
something like IteratorWithSizeEstimate/something/something.

Not only for this, but so that, ideally, Spark could basically do
dynamic partitioning.

E.g. when we load a month's worth of data, it's X GB, but after a few
maps and filters, it's X/100 GB, so could use X/100 partitions instead.

But right now all partitioning decisions are made up-front,
via .coalesce/etc. type hints from the programmer, and it seems if
Spark could delay making partitioning decisions each until RDD could
like lazily-eval/sample a few lines (hand waving), that would be super
sexy from our respective, in terms of doing automatic perf/partition
optimization.

Huge disclaimer that this is probably a big pita to implement, and
could likely not be as worthwhile as I naively think it would be.

- Stephen

Re: SPARK-942

Posted by Koert Kuipers <ko...@tresata.com>.
if spark wants to compete as an alternative for mapreduce on hadoop
clusters, then the assumption should not be that 99.9% of time data will
fit in memory. it will not.

however that said, i am fine with a solution where one has to use DISK_ONLY
for this, since that is exactly what mapreduce does too anyhow.


On Mon, Nov 11, 2013 at 8:14 PM, Xia, Junluan <ju...@intel.com> wrote:

> Hi Kyle
>
> I totally agree with you. 'best' solution currently is to only handle
> "DISK_ONLY" scenario and put iterator directly to BlockManager.
>
> It is so expensive for us to make code complicated for only 0.1%
> possibility before we get perfect solution.
>
> -----Original Message-----
> From: Kyle Ellrott [mailto:kellrott@soe.ucsc.edu]
> Sent: Tuesday, November 12, 2013 6:28 AM
> To: dev@spark.incubator.apache.org
> Subject: Re: SPARK-942
>
> The problem is that the iterator interface only defines 'hasNext' and
> 'next' methods. So I don't think that there is really anyway to estimate
> the total count until the iterator is done traversing. In my particular
> case, I'm wrapping a OpenRDF RIO iterator, that is parsing a gzipfile
> stream. And one of the files just happens to be several gigabytes large.
> Each of the individual elements spit out by the iterator are all the same,
> just sometimes it spits out a few million more then normal.
>
> It's not a normal occurrence. 99.9% of the time, when people call 'flatMap'
> they will probably be producing arrays that fit nicely into memory. Trying
> to do a bunch of extra book keeping (ie unrolling the iterator one at a
> time, trying to figure out if it's gotten too big yet), may be an extra
> complication that makes the code much more complicated while only providing
> a solution for extreme edge cases.
>
> I think the 'best' way to go would to leave the 'MEMORY_ONLY' and
> 'MEMORY_AND_DISK' behaviors the same. If the user knows that their code
> could produce these 'mega-iterators' then they pass a 'DISK_ONLY' and that
> iterator gets passed straight to the BlockManager to be written straight to
> disk. Then all we have to do is change "def put(blockId: BlockId, values:
> Iterator[Any], level: StorageLevel, tellMaster: Boolean)"
> (BlockManager.scala:452), to call 'diskStore.putValues' directly, rather
> then unrolling the iterator and passing it onto the stardard 'doPut' like
> it does now.
>
> Kyle
>
>
>
>
> On Mon, Nov 11, 2013 at 5:18 AM, Xia, Junluan <ju...@intel.com>
> wrote:
>
> > Hi
> >
> > I think it is bad user experience to throw OOM exception when user
> > only persist the RDD with DISK_ONLY or MEMORY_ADN_DISK.
> >
> > As Kyle mentioned below, Key point is that CacheManager has unrolled
> > the total Iterator into ArrayBuffer without free memory check, we
> > should estimate size of unrolled iterator object and check if it is
> > beyond current free memory size.
> >
> > We could separate into three scenarios
> >
> > 1. For MEMORY_ONLY, I think it is normal case to throw OOM exception
> > and need user to adjust its application 2. For MEMORY_AND_DISK, we
> > should check if free memory could hold unrolled Arraybuffer, if yes,
> > then it will go with usual path, if no, we will degrade it to
> > DISK_ONLY 3. For DIS_ONLY, I think that we need not to unroll total
> > iterator into ArrayBuffer, because we could write this iterator one by
> > one to disk.
> >
> > So this issue is how to judge if free memory size could hold size of
> > unrolled iterator before it become Arraybuffer.
> >
> > Is there any solution for this case? Could we just unroll first 10% of
> > total iterator into ArrayBuffer, and estimate this size, and total
> > size is equal to 10* size of 10%? apparently it is not perfect.
> >
> > -----Original Message-----
> > From: Kyle Ellrott [mailto:kellrott@soe.ucsc.edu]
> > Sent: Thursday, November 07, 2013 2:59 AM
> > To: dev@spark.incubator.apache.org
> > Subject: Re: SPARK-942
> >
> > I think the usage has to be calculated as the iterator is being put
> > into the arraybuffer.
> > Right now, the BlockManager, in it's put method when it gets an
> > iterator named 'values' uses the simple stanza of:
> >
> > def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel,
> > tellMaster: Boolean)
> >     : Long = {
> >     val elements = new ArrayBuffer[Any]
> >     elements ++= values
> >     put(blockId, elements, level, tellMaster) }
> >
> >
> > Completely unrolling the iterator in a single line.  Above it, the
> > CacheManager does the exact same thing with:
> >
> > val elements = new ArrayBuffer[Any]
> > elements ++= computedValues
> > blockManager.put(key, elements, storageLevel, tellMaster = true)
> >
> >
> > We would probably have to implement some sort of 'IteratorBuffer'
> > class, which would wrap an iterator. It would include a method to
> > unroll an iterator into a buffer up to a point, something like
> >
> > def unroll(maxMem:Long) : Boolean ={ ...}
> >
> > And it would return True if the maxMem was hit. At which point
> > BlockManager could read through the already cached values, then
> > continue on through the rest of the iterators dumping all the values
> > to file. If it unrolled without hitting maxMem (which would probably
> > be most of the time), the class would simply wrap the ArrayBuffer of
> cached values.
> >
> > Kyle
> >
> >
> >
> > On Sun, Nov 3, 2013 at 12:50 AM, Reynold Xin <rx...@apache.org> wrote:
> >
> > > It's not a very elegant solution, but one possibility is for the
> > > CacheManager to check whether it will have enough space. If it is
> > > running out of space, skips buffering the output of the iterator &
> > > directly write the output of the iterator to disk (if storage level
> > allows that).
> > >
> > > But it is still tricky to know whether we will run out of space
> > > before we even start running the iterator. One possibility is to use
> > > sizing data from previous partitions to estimate the size of the
> > > current
> > partition (i.e.
> > > estimated in memory size = avg of current in-memory size / current
> > > input size).
> > >
> > > Do you have any ideas on this one, Kyle?
> > >
> > >
> > > On Sat, Oct 26, 2013 at 10:53 AM, Kyle Ellrott
> > > <kellrott@soe.ucsc.edu
> > > >wrote:
> > >
> > > > I was wondering if anybody had any thoughts on the best way to
> > > > tackle
> > > > SPARK-942 ( https://spark-project.atlassian.net/browse/SPARK-942 ).
> > > > Basically, Spark takes an iterator from a flatmap call and because
> > > > I tell it that it needs to persist Spark proceeds to push it all
> > > > into an array before deciding that it doesn't have enough memory
> > > > and trying to
> > > serialize
> > > > it to disk, and somewhere along the line it runs out of memory.
> > > > For my particular operation, the function return an iterator that
> > > > reads data out of a file, and the size of the files passed to that
> > > > function can vary greatly (from a few kilobytes to a few gigabytes).
> > > > The funny thing is
> > > that
> > > > if I do a strait 'map' operation after the flat map, everything
> > > > works, because Spark just passes the iterator forward and never
> > > > tries to expand the whole thing into memory. But I need do a
> > > > reduceByKey across all the records, so I'd like to persist to disk
> > > > first, and that is where I hit
> > > this
> > > > snag.
> > > > I've already setup a unit test to replicate the problem, and I
> > > > know the area of the code that would need to be fixed.
> > > > I'm just hoping for some tips on the best way to fix the problem.
> > > >
> > > > Kyle
> > > >
> > >
> >
>

RE: SPARK-942

Posted by "Xia, Junluan" <ju...@intel.com>.
Hi Kyle

I totally agree with you. 'best' solution currently is to only handle "DISK_ONLY" scenario and put iterator directly to BlockManager.

It is so expensive for us to make code complicated for only 0.1% possibility before we get perfect solution.

-----Original Message-----
From: Kyle Ellrott [mailto:kellrott@soe.ucsc.edu] 
Sent: Tuesday, November 12, 2013 6:28 AM
To: dev@spark.incubator.apache.org
Subject: Re: SPARK-942

The problem is that the iterator interface only defines 'hasNext' and 'next' methods. So I don't think that there is really anyway to estimate the total count until the iterator is done traversing. In my particular case, I'm wrapping a OpenRDF RIO iterator, that is parsing a gzipfile stream. And one of the files just happens to be several gigabytes large.
Each of the individual elements spit out by the iterator are all the same, just sometimes it spits out a few million more then normal.

It's not a normal occurrence. 99.9% of the time, when people call 'flatMap'
they will probably be producing arrays that fit nicely into memory. Trying to do a bunch of extra book keeping (ie unrolling the iterator one at a time, trying to figure out if it's gotten too big yet), may be an extra complication that makes the code much more complicated while only providing a solution for extreme edge cases.

I think the 'best' way to go would to leave the 'MEMORY_ONLY' and 'MEMORY_AND_DISK' behaviors the same. If the user knows that their code could produce these 'mega-iterators' then they pass a 'DISK_ONLY' and that iterator gets passed straight to the BlockManager to be written straight to disk. Then all we have to do is change "def put(blockId: BlockId, values:
Iterator[Any], level: StorageLevel, tellMaster: Boolean)"
(BlockManager.scala:452), to call 'diskStore.putValues' directly, rather then unrolling the iterator and passing it onto the stardard 'doPut' like it does now.

Kyle




On Mon, Nov 11, 2013 at 5:18 AM, Xia, Junluan <ju...@intel.com> wrote:

> Hi
>
> I think it is bad user experience to throw OOM exception when user 
> only persist the RDD with DISK_ONLY or MEMORY_ADN_DISK.
>
> As Kyle mentioned below, Key point is that CacheManager has unrolled 
> the total Iterator into ArrayBuffer without free memory check, we 
> should estimate size of unrolled iterator object and check if it is 
> beyond current free memory size.
>
> We could separate into three scenarios
>
> 1. For MEMORY_ONLY, I think it is normal case to throw OOM exception 
> and need user to adjust its application 2. For MEMORY_AND_DISK, we 
> should check if free memory could hold unrolled Arraybuffer, if yes, 
> then it will go with usual path, if no, we will degrade it to 
> DISK_ONLY 3. For DIS_ONLY, I think that we need not to unroll total 
> iterator into ArrayBuffer, because we could write this iterator one by 
> one to disk.
>
> So this issue is how to judge if free memory size could hold size of 
> unrolled iterator before it become Arraybuffer.
>
> Is there any solution for this case? Could we just unroll first 10% of 
> total iterator into ArrayBuffer, and estimate this size, and total 
> size is equal to 10* size of 10%? apparently it is not perfect.
>
> -----Original Message-----
> From: Kyle Ellrott [mailto:kellrott@soe.ucsc.edu]
> Sent: Thursday, November 07, 2013 2:59 AM
> To: dev@spark.incubator.apache.org
> Subject: Re: SPARK-942
>
> I think the usage has to be calculated as the iterator is being put 
> into the arraybuffer.
> Right now, the BlockManager, in it's put method when it gets an 
> iterator named 'values' uses the simple stanza of:
>
> def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel,
> tellMaster: Boolean)
>     : Long = {
>     val elements = new ArrayBuffer[Any]
>     elements ++= values
>     put(blockId, elements, level, tellMaster) }
>
>
> Completely unrolling the iterator in a single line.  Above it, the 
> CacheManager does the exact same thing with:
>
> val elements = new ArrayBuffer[Any]
> elements ++= computedValues
> blockManager.put(key, elements, storageLevel, tellMaster = true)
>
>
> We would probably have to implement some sort of 'IteratorBuffer' 
> class, which would wrap an iterator. It would include a method to 
> unroll an iterator into a buffer up to a point, something like
>
> def unroll(maxMem:Long) : Boolean ={ ...}
>
> And it would return True if the maxMem was hit. At which point 
> BlockManager could read through the already cached values, then 
> continue on through the rest of the iterators dumping all the values 
> to file. If it unrolled without hitting maxMem (which would probably 
> be most of the time), the class would simply wrap the ArrayBuffer of cached values.
>
> Kyle
>
>
>
> On Sun, Nov 3, 2013 at 12:50 AM, Reynold Xin <rx...@apache.org> wrote:
>
> > It's not a very elegant solution, but one possibility is for the 
> > CacheManager to check whether it will have enough space. If it is 
> > running out of space, skips buffering the output of the iterator & 
> > directly write the output of the iterator to disk (if storage level
> allows that).
> >
> > But it is still tricky to know whether we will run out of space 
> > before we even start running the iterator. One possibility is to use 
> > sizing data from previous partitions to estimate the size of the 
> > current
> partition (i.e.
> > estimated in memory size = avg of current in-memory size / current 
> > input size).
> >
> > Do you have any ideas on this one, Kyle?
> >
> >
> > On Sat, Oct 26, 2013 at 10:53 AM, Kyle Ellrott 
> > <kellrott@soe.ucsc.edu
> > >wrote:
> >
> > > I was wondering if anybody had any thoughts on the best way to 
> > > tackle
> > > SPARK-942 ( https://spark-project.atlassian.net/browse/SPARK-942 ).
> > > Basically, Spark takes an iterator from a flatmap call and because 
> > > I tell it that it needs to persist Spark proceeds to push it all 
> > > into an array before deciding that it doesn't have enough memory 
> > > and trying to
> > serialize
> > > it to disk, and somewhere along the line it runs out of memory. 
> > > For my particular operation, the function return an iterator that 
> > > reads data out of a file, and the size of the files passed to that 
> > > function can vary greatly (from a few kilobytes to a few gigabytes).
> > > The funny thing is
> > that
> > > if I do a strait 'map' operation after the flat map, everything 
> > > works, because Spark just passes the iterator forward and never 
> > > tries to expand the whole thing into memory. But I need do a 
> > > reduceByKey across all the records, so I'd like to persist to disk 
> > > first, and that is where I hit
> > this
> > > snag.
> > > I've already setup a unit test to replicate the problem, and I 
> > > know the area of the code that would need to be fixed.
> > > I'm just hoping for some tips on the best way to fix the problem.
> > >
> > > Kyle
> > >
> >
>

Re: SPARK-942

Posted by Kyle Ellrott <ke...@soe.ucsc.edu>.
The problem is that the iterator interface only defines 'hasNext' and
'next' methods. So I don't think that there is really anyway to estimate
the total count until the iterator is done traversing. In my particular
case, I'm wrapping a OpenRDF RIO iterator, that is parsing a gzipfile
stream. And one of the files just happens to be several gigabytes large.
Each of the individual elements spit out by the iterator are all the same,
just sometimes it spits out a few million more then normal.

It's not a normal occurrence. 99.9% of the time, when people call 'flatMap'
they will probably be producing arrays that fit nicely into memory. Trying
to do a bunch of extra book keeping (ie unrolling the iterator one at a
time, trying to figure out if it's gotten too big yet), may be an extra
complication that makes the code much more complicated while only providing
a solution for extreme edge cases.

I think the 'best' way to go would to leave the 'MEMORY_ONLY' and
'MEMORY_AND_DISK' behaviors the same. If the user knows that their code
could produce these 'mega-iterators' then they pass a 'DISK_ONLY' and that
iterator gets passed straight to the BlockManager to be written straight to
disk. Then all we have to do is change "def put(blockId: BlockId, values:
Iterator[Any], level: StorageLevel, tellMaster: Boolean)"
(BlockManager.scala:452), to call 'diskStore.putValues' directly, rather
then unrolling the iterator and passing it onto the stardard 'doPut' like
it does now.

Kyle




On Mon, Nov 11, 2013 at 5:18 AM, Xia, Junluan <ju...@intel.com> wrote:

> Hi
>
> I think it is bad user experience to throw OOM exception when user only
> persist the RDD with DISK_ONLY or MEMORY_ADN_DISK.
>
> As Kyle mentioned below, Key point is that CacheManager has unrolled the
> total Iterator into ArrayBuffer without free memory check, we should
> estimate size of unrolled iterator object and check if it is beyond current
> free memory size.
>
> We could separate into three scenarios
>
> 1. For MEMORY_ONLY, I think it is normal case to throw OOM exception and
> need user to adjust its application
> 2. For MEMORY_AND_DISK, we should check if free memory could hold unrolled
> Arraybuffer, if yes, then it will go with usual path, if no, we will
> degrade it to DISK_ONLY
> 3. For DIS_ONLY, I think that we need not to unroll total iterator into
> ArrayBuffer, because we could write this iterator one by one to disk.
>
> So this issue is how to judge if free memory size could hold size of
> unrolled iterator before it become Arraybuffer.
>
> Is there any solution for this case? Could we just unroll first 10% of
> total iterator into ArrayBuffer, and estimate this size, and total size is
> equal to 10* size of 10%? apparently it is not perfect.
>
> -----Original Message-----
> From: Kyle Ellrott [mailto:kellrott@soe.ucsc.edu]
> Sent: Thursday, November 07, 2013 2:59 AM
> To: dev@spark.incubator.apache.org
> Subject: Re: SPARK-942
>
> I think the usage has to be calculated as the iterator is being put into
> the arraybuffer.
> Right now, the BlockManager, in it's put method when it gets an iterator
> named 'values' uses the simple stanza of:
>
> def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel,
> tellMaster: Boolean)
>     : Long = {
>     val elements = new ArrayBuffer[Any]
>     elements ++= values
>     put(blockId, elements, level, tellMaster) }
>
>
> Completely unrolling the iterator in a single line.  Above it, the
> CacheManager does the exact same thing with:
>
> val elements = new ArrayBuffer[Any]
> elements ++= computedValues
> blockManager.put(key, elements, storageLevel, tellMaster = true)
>
>
> We would probably have to implement some sort of 'IteratorBuffer' class,
> which would wrap an iterator. It would include a method to unroll an
> iterator into a buffer up to a point, something like
>
> def unroll(maxMem:Long) : Boolean ={ ...}
>
> And it would return True if the maxMem was hit. At which point
> BlockManager could read through the already cached values, then continue on
> through the rest of the iterators dumping all the values to file. If it
> unrolled without hitting maxMem (which would probably be most of the time),
> the class would simply wrap the ArrayBuffer of cached values.
>
> Kyle
>
>
>
> On Sun, Nov 3, 2013 at 12:50 AM, Reynold Xin <rx...@apache.org> wrote:
>
> > It's not a very elegant solution, but one possibility is for the
> > CacheManager to check whether it will have enough space. If it is
> > running out of space, skips buffering the output of the iterator &
> > directly write the output of the iterator to disk (if storage level
> allows that).
> >
> > But it is still tricky to know whether we will run out of space before
> > we even start running the iterator. One possibility is to use sizing
> > data from previous partitions to estimate the size of the current
> partition (i.e.
> > estimated in memory size = avg of current in-memory size / current
> > input size).
> >
> > Do you have any ideas on this one, Kyle?
> >
> >
> > On Sat, Oct 26, 2013 at 10:53 AM, Kyle Ellrott <kellrott@soe.ucsc.edu
> > >wrote:
> >
> > > I was wondering if anybody had any thoughts on the best way to
> > > tackle
> > > SPARK-942 ( https://spark-project.atlassian.net/browse/SPARK-942 ).
> > > Basically, Spark takes an iterator from a flatmap call and because I
> > > tell it that it needs to persist Spark proceeds to push it all into
> > > an array before deciding that it doesn't have enough memory and
> > > trying to
> > serialize
> > > it to disk, and somewhere along the line it runs out of memory. For
> > > my particular operation, the function return an iterator that reads
> > > data out of a file, and the size of the files passed to that
> > > function can vary greatly (from a few kilobytes to a few gigabytes).
> > > The funny thing is
> > that
> > > if I do a strait 'map' operation after the flat map, everything
> > > works, because Spark just passes the iterator forward and never
> > > tries to expand the whole thing into memory. But I need do a
> > > reduceByKey across all the records, so I'd like to persist to disk
> > > first, and that is where I hit
> > this
> > > snag.
> > > I've already setup a unit test to replicate the problem, and I know
> > > the area of the code that would need to be fixed.
> > > I'm just hoping for some tips on the best way to fix the problem.
> > >
> > > Kyle
> > >
> >
>

RE: SPARK-942

Posted by "Xia, Junluan" <ju...@intel.com>.
Hi 

I think it is bad user experience to throw OOM exception when user only persist the RDD with DISK_ONLY or MEMORY_ADN_DISK.

As Kyle mentioned below, Key point is that CacheManager has unrolled the total Iterator into ArrayBuffer without free memory check, we should estimate size of unrolled iterator object and check if it is beyond current free memory size.

We could separate into three scenarios

1. For MEMORY_ONLY, I think it is normal case to throw OOM exception and need user to adjust its application
2. For MEMORY_AND_DISK, we should check if free memory could hold unrolled Arraybuffer, if yes, then it will go with usual path, if no, we will degrade it to DISK_ONLY
3. For DIS_ONLY, I think that we need not to unroll total iterator into ArrayBuffer, because we could write this iterator one by one to disk.

So this issue is how to judge if free memory size could hold size of unrolled iterator before it become Arraybuffer.

Is there any solution for this case? Could we just unroll first 10% of total iterator into ArrayBuffer, and estimate this size, and total size is equal to 10* size of 10%? apparently it is not perfect.

-----Original Message-----
From: Kyle Ellrott [mailto:kellrott@soe.ucsc.edu] 
Sent: Thursday, November 07, 2013 2:59 AM
To: dev@spark.incubator.apache.org
Subject: Re: SPARK-942

I think the usage has to be calculated as the iterator is being put into the arraybuffer.
Right now, the BlockManager, in it's put method when it gets an iterator named 'values' uses the simple stanza of:

def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel,
tellMaster: Boolean)
    : Long = {
    val elements = new ArrayBuffer[Any]
    elements ++= values
    put(blockId, elements, level, tellMaster) }


Completely unrolling the iterator in a single line.  Above it, the CacheManager does the exact same thing with:

val elements = new ArrayBuffer[Any]
elements ++= computedValues
blockManager.put(key, elements, storageLevel, tellMaster = true)


We would probably have to implement some sort of 'IteratorBuffer' class, which would wrap an iterator. It would include a method to unroll an iterator into a buffer up to a point, something like

def unroll(maxMem:Long) : Boolean ={ ...}

And it would return True if the maxMem was hit. At which point BlockManager could read through the already cached values, then continue on through the rest of the iterators dumping all the values to file. If it unrolled without hitting maxMem (which would probably be most of the time), the class would simply wrap the ArrayBuffer of cached values.

Kyle



On Sun, Nov 3, 2013 at 12:50 AM, Reynold Xin <rx...@apache.org> wrote:

> It's not a very elegant solution, but one possibility is for the 
> CacheManager to check whether it will have enough space. If it is 
> running out of space, skips buffering the output of the iterator & 
> directly write the output of the iterator to disk (if storage level allows that).
>
> But it is still tricky to know whether we will run out of space before 
> we even start running the iterator. One possibility is to use sizing 
> data from previous partitions to estimate the size of the current partition (i.e.
> estimated in memory size = avg of current in-memory size / current 
> input size).
>
> Do you have any ideas on this one, Kyle?
>
>
> On Sat, Oct 26, 2013 at 10:53 AM, Kyle Ellrott <kellrott@soe.ucsc.edu
> >wrote:
>
> > I was wondering if anybody had any thoughts on the best way to 
> > tackle
> > SPARK-942 ( https://spark-project.atlassian.net/browse/SPARK-942 ).
> > Basically, Spark takes an iterator from a flatmap call and because I 
> > tell it that it needs to persist Spark proceeds to push it all into 
> > an array before deciding that it doesn't have enough memory and 
> > trying to
> serialize
> > it to disk, and somewhere along the line it runs out of memory. For 
> > my particular operation, the function return an iterator that reads 
> > data out of a file, and the size of the files passed to that 
> > function can vary greatly (from a few kilobytes to a few gigabytes). 
> > The funny thing is
> that
> > if I do a strait 'map' operation after the flat map, everything 
> > works, because Spark just passes the iterator forward and never 
> > tries to expand the whole thing into memory. But I need do a 
> > reduceByKey across all the records, so I'd like to persist to disk 
> > first, and that is where I hit
> this
> > snag.
> > I've already setup a unit test to replicate the problem, and I know 
> > the area of the code that would need to be fixed.
> > I'm just hoping for some tips on the best way to fix the problem.
> >
> > Kyle
> >
>

Re: SPARK-942

Posted by Kyle Ellrott <ke...@soe.ucsc.edu>.
I think the usage has to be calculated as the iterator is being put into
the arraybuffer.
Right now, the BlockManager, in it's put method when it gets an iterator
named 'values' uses the simple stanza of:

def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel,
tellMaster: Boolean)
    : Long = {
    val elements = new ArrayBuffer[Any]
    elements ++= values
    put(blockId, elements, level, tellMaster)
}


Completely unrolling the iterator in a single line.  Above it, the
CacheManager does the exact same thing with:

val elements = new ArrayBuffer[Any]
elements ++= computedValues
blockManager.put(key, elements, storageLevel, tellMaster = true)


We would probably have to implement some sort of 'IteratorBuffer' class,
which would wrap an iterator. It would include a method to unroll an
iterator into a buffer up to a point, something like

def unroll(maxMem:Long) : Boolean ={ ...}

And it would return True if the maxMem was hit. At which point BlockManager
could read through the already cached values, then continue on through the
rest of the iterators dumping all the values to file. If it unrolled
without hitting maxMem (which would probably be most of the time), the
class would simply wrap the ArrayBuffer of cached values.

Kyle



On Sun, Nov 3, 2013 at 12:50 AM, Reynold Xin <rx...@apache.org> wrote:

> It's not a very elegant solution, but one possibility is for the
> CacheManager to check whether it will have enough space. If it is running
> out of space, skips buffering the output of the iterator & directly write
> the output of the iterator to disk (if storage level allows that).
>
> But it is still tricky to know whether we will run out of space before we
> even start running the iterator. One possibility is to use sizing data from
> previous partitions to estimate the size of the current partition (i.e.
> estimated in memory size = avg of current in-memory size / current input
> size).
>
> Do you have any ideas on this one, Kyle?
>
>
> On Sat, Oct 26, 2013 at 10:53 AM, Kyle Ellrott <kellrott@soe.ucsc.edu
> >wrote:
>
> > I was wondering if anybody had any thoughts on the best way to tackle
> > SPARK-942 ( https://spark-project.atlassian.net/browse/SPARK-942 ).
> > Basically, Spark takes an iterator from a flatmap call and because I tell
> > it that it needs to persist Spark proceeds to push it all into an array
> > before deciding that it doesn't have enough memory and trying to
> serialize
> > it to disk, and somewhere along the line it runs out of memory. For my
> > particular operation, the function return an iterator that reads data out
> > of a file, and the size of the files passed to that function can vary
> > greatly (from a few kilobytes to a few gigabytes). The funny thing is
> that
> > if I do a strait 'map' operation after the flat map, everything works,
> > because Spark just passes the iterator forward and never tries to expand
> > the whole thing into memory. But I need do a reduceByKey across all the
> > records, so I'd like to persist to disk first, and that is where I hit
> this
> > snag.
> > I've already setup a unit test to replicate the problem, and I know the
> > area of the code that would need to be fixed.
> > I'm just hoping for some tips on the best way to fix the problem.
> >
> > Kyle
> >
>

Re: SPARK-942

Posted by Reynold Xin <rx...@apache.org>.
It's not a very elegant solution, but one possibility is for the
CacheManager to check whether it will have enough space. If it is running
out of space, skips buffering the output of the iterator & directly write
the output of the iterator to disk (if storage level allows that).

But it is still tricky to know whether we will run out of space before we
even start running the iterator. One possibility is to use sizing data from
previous partitions to estimate the size of the current partition (i.e.
estimated in memory size = avg of current in-memory size / current input
size).

Do you have any ideas on this one, Kyle?


On Sat, Oct 26, 2013 at 10:53 AM, Kyle Ellrott <ke...@soe.ucsc.edu>wrote:

> I was wondering if anybody had any thoughts on the best way to tackle
> SPARK-942 ( https://spark-project.atlassian.net/browse/SPARK-942 ).
> Basically, Spark takes an iterator from a flatmap call and because I tell
> it that it needs to persist Spark proceeds to push it all into an array
> before deciding that it doesn't have enough memory and trying to serialize
> it to disk, and somewhere along the line it runs out of memory. For my
> particular operation, the function return an iterator that reads data out
> of a file, and the size of the files passed to that function can vary
> greatly (from a few kilobytes to a few gigabytes). The funny thing is that
> if I do a strait 'map' operation after the flat map, everything works,
> because Spark just passes the iterator forward and never tries to expand
> the whole thing into memory. But I need do a reduceByKey across all the
> records, so I'd like to persist to disk first, and that is where I hit this
> snag.
> I've already setup a unit test to replicate the problem, and I know the
> area of the code that would need to be fixed.
> I'm just hoping for some tips on the best way to fix the problem.
>
> Kyle
>