You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Andrew Ash <an...@andrewash.com> on 2015/02/18 16:09:39 UTC

Streaming partitions to driver for use in .toLocalIterator

Hi Spark devs,

I'm creating a streaming export functionality for RDDs and am having some
trouble with large partitions.  The RDD.toLocalIterator() call pulls over a
partition at a time to the driver, and then streams the RDD out from that
partition before pulling in the next one.  When you have large partitions
though, you can OOM the driver, especially when multiple of these exports
are happening in the same SparkContext.

One idea I had was to repartition the RDD so partitions are smaller, but
it's hard to know a priori what the partition count should be, and I'd like
to avoid paying the shuffle cost if possible -- I think repartition to a
higher partition count forces a shuffle.

Is it feasible to rework this so the executor -> driver transfer in
.toLocalIterator is a steady stream rather than a partition at a time?

Thanks!
Andrew

Re: Streaming partitions to driver for use in .toLocalIterator

Posted by Andrew Ash <an...@andrewash.com>.
I think a cheap way to repartition to a higher partition count without
shuffle would be valuable too.  Right now you can choose whether to execute
a shuffle when going down in partition count, but going up in partition
count always requires a shuffle.  For the need of having a smaller
partitions to make .toLocalIterator more efficient, no shuffle on increase
of partition count is necessary.

Filed as https://issues.apache.org/jira/browse/SPARK-5997

On Wed, Feb 18, 2015 at 3:21 PM, Mingyu Kim <mk...@palantir.com> wrote:

> Another alternative would be to compress the partition in memory in a
> streaming fashion instead of calling .toArray on the iterator. Would it be
> an easier mitigation to the problem? Or, is it hard to compress the rows
> one by one without materializing the full partition in memory using the
> compression algo Spark uses currently?
>
> Mingyu
>
>
>
>
>
> On 2/18/15, 1:01 PM, "Imran Rashid" <ir...@cloudera.com> wrote:
>
> >This would be pretty tricky to do -- the issue is that right now
> >sparkContext.runJob has you pass in a function from a partition to *one*
> >result object that gets serialized and sent back: Iterator[T] => U, and
> >that idea is baked pretty deep into a lot of the internals, DAGScheduler,
> >Task, Executors, etc.
> >
> >Maybe another possibility worth considering: should we make it easy to go
> >from N partitions to 2N partitions (or any other multiple obviously)
> >without requiring a shuffle?  for that matter, you should also be able to
> >go from 2N to N without a shuffle as well.  That change is also somewhat
> >involved, though.
> >
> >Both are in theory possible, but I imagine they'd need really compelling
> >use cases.
> >
> >An alternative would be to write your RDD to some other data store (eg,
> >hdfs) which has better support for reading data in a streaming fashion,
> >though you would probably be unhappy with the overhead.
> >
> >
> >
> >On Wed, Feb 18, 2015 at 9:09 AM, Andrew Ash <an...@andrewash.com> wrote:
> >
> >> Hi Spark devs,
> >>
> >> I'm creating a streaming export functionality for RDDs and am having
> >>some
> >> trouble with large partitions.  The RDD.toLocalIterator() call pulls
> >>over a
> >> partition at a time to the driver, and then streams the RDD out from
> >>that
> >> partition before pulling in the next one.  When you have large
> >>partitions
> >> though, you can OOM the driver, especially when multiple of these
> >>exports
> >> are happening in the same SparkContext.
> >>
> >> One idea I had was to repartition the RDD so partitions are smaller, but
> >> it's hard to know a priori what the partition count should be, and I'd
> >>like
> >> to avoid paying the shuffle cost if possible -- I think repartition to a
> >> higher partition count forces a shuffle.
> >>
> >> Is it feasible to rework this so the executor -> driver transfer in
> >> .toLocalIterator is a steady stream rather than a partition at a time?
> >>
> >> Thanks!
> >> Andrew
> >>
>
>

Re: Streaming partitions to driver for use in .toLocalIterator

Posted by Mingyu Kim <mk...@palantir.com>.
Another alternative would be to compress the partition in memory in a
streaming fashion instead of calling .toArray on the iterator. Would it be
an easier mitigation to the problem? Or, is it hard to compress the rows
one by one without materializing the full partition in memory using the
compression algo Spark uses currently?

Mingyu





On 2/18/15, 1:01 PM, "Imran Rashid" <ir...@cloudera.com> wrote:

>This would be pretty tricky to do -- the issue is that right now
>sparkContext.runJob has you pass in a function from a partition to *one*
>result object that gets serialized and sent back: Iterator[T] => U, and
>that idea is baked pretty deep into a lot of the internals, DAGScheduler,
>Task, Executors, etc.
>
>Maybe another possibility worth considering: should we make it easy to go
>from N partitions to 2N partitions (or any other multiple obviously)
>without requiring a shuffle?  for that matter, you should also be able to
>go from 2N to N without a shuffle as well.  That change is also somewhat
>involved, though.
>
>Both are in theory possible, but I imagine they'd need really compelling
>use cases.
>
>An alternative would be to write your RDD to some other data store (eg,
>hdfs) which has better support for reading data in a streaming fashion,
>though you would probably be unhappy with the overhead.
>
>
>
>On Wed, Feb 18, 2015 at 9:09 AM, Andrew Ash <an...@andrewash.com> wrote:
>
>> Hi Spark devs,
>>
>> I'm creating a streaming export functionality for RDDs and am having
>>some
>> trouble with large partitions.  The RDD.toLocalIterator() call pulls
>>over a
>> partition at a time to the driver, and then streams the RDD out from
>>that
>> partition before pulling in the next one.  When you have large
>>partitions
>> though, you can OOM the driver, especially when multiple of these
>>exports
>> are happening in the same SparkContext.
>>
>> One idea I had was to repartition the RDD so partitions are smaller, but
>> it's hard to know a priori what the partition count should be, and I'd
>>like
>> to avoid paying the shuffle cost if possible -- I think repartition to a
>> higher partition count forces a shuffle.
>>
>> Is it feasible to rework this so the executor -> driver transfer in
>> .toLocalIterator is a steady stream rather than a partition at a time?
>>
>> Thanks!
>> Andrew
>>


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: Streaming partitions to driver for use in .toLocalIterator

Posted by Imran Rashid <ir...@cloudera.com>.
This would be pretty tricky to do -- the issue is that right now
sparkContext.runJob has you pass in a function from a partition to *one*
result object that gets serialized and sent back: Iterator[T] => U, and
that idea is baked pretty deep into a lot of the internals, DAGScheduler,
Task, Executors, etc.

Maybe another possibility worth considering: should we make it easy to go
from N partitions to 2N partitions (or any other multiple obviously)
without requiring a shuffle?  for that matter, you should also be able to
go from 2N to N without a shuffle as well.  That change is also somewhat
involved, though.

Both are in theory possible, but I imagine they'd need really compelling
use cases.

An alternative would be to write your RDD to some other data store (eg,
hdfs) which has better support for reading data in a streaming fashion,
though you would probably be unhappy with the overhead.



On Wed, Feb 18, 2015 at 9:09 AM, Andrew Ash <an...@andrewash.com> wrote:

> Hi Spark devs,
>
> I'm creating a streaming export functionality for RDDs and am having some
> trouble with large partitions.  The RDD.toLocalIterator() call pulls over a
> partition at a time to the driver, and then streams the RDD out from that
> partition before pulling in the next one.  When you have large partitions
> though, you can OOM the driver, especially when multiple of these exports
> are happening in the same SparkContext.
>
> One idea I had was to repartition the RDD so partitions are smaller, but
> it's hard to know a priori what the partition count should be, and I'd like
> to avoid paying the shuffle cost if possible -- I think repartition to a
> higher partition count forces a shuffle.
>
> Is it feasible to rework this so the executor -> driver transfer in
> .toLocalIterator is a steady stream rather than a partition at a time?
>
> Thanks!
> Andrew
>