You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Andrew Ash <an...@andrewash.com> on 2014/06/05 19:29:17 UTC

Implementing rdd.scanLeft()

I have a use case that would greatly benefit from RDDs having a .scanLeft()
method.  Are the project developers interested in adding this to the public
API?


Looking through past message traffic, this has come up a few times.  The
recommendation from the list before has been to implement a parallel prefix
scan.

http://comments.gmane.org/gmane.comp.lang.scala.spark.user/1880
https://groups.google.com/forum/#!topic/spark-users/ts-FdB50ltY

The algorithm Reynold sketched in the first link leads to this working
implementation:

val vector = sc.parallelize(1 to 20, 3)

val sums = 0 +: vector.mapPartitionsWithIndex{ case(partition, iter) =>
Iterator(iter.sum) }.collect.scanLeft(0)(_+_).drop(1)

val prefixScan = vector.mapPartitionsWithIndex { case(partition, iter) =>
  val base = sums(partition)
  println(partition, base)
  iter.scanLeft(base)(_+_).drop(1)
}.collect


I'd love to have that replaced with this:

val vector = sc.parallelize(1 to 20, 3)
val cumSum: RDD[Int] = vector.scanLeft(0)(_+_)


Any thoughts on whether this contribution would be accepted?  What pitfalls
exist that I should be thinking about?

Thanks!
Andrew

Re: Implementing rdd.scanLeft()

Posted by Andrew Ash <an...@andrewash.com>.
I that something that documentation on the method can solve?


On Thu, Jun 5, 2014 at 10:47 AM, Reynold Xin <rx...@databricks.com> wrote:

> I think the main concern is this would require scanning the data twice, and
> maybe the user should be aware of it ...
>
>
> On Thu, Jun 5, 2014 at 10:29 AM, Andrew Ash <an...@andrewash.com> wrote:
>
> > I have a use case that would greatly benefit from RDDs having a
> .scanLeft()
> > method.  Are the project developers interested in adding this to the
> public
> > API?
> >
> >
> > Looking through past message traffic, this has come up a few times.  The
> > recommendation from the list before has been to implement a parallel
> prefix
> > scan.
> >
> > http://comments.gmane.org/gmane.comp.lang.scala.spark.user/1880
> > https://groups.google.com/forum/#!topic/spark-users/ts-FdB50ltY
> >
> > The algorithm Reynold sketched in the first link leads to this working
> > implementation:
> >
> > val vector = sc.parallelize(1 to 20, 3)
> >
> > val sums = 0 +: vector.mapPartitionsWithIndex{ case(partition, iter) =>
> > Iterator(iter.sum) }.collect.scanLeft(0)(_+_).drop(1)
> >
> > val prefixScan = vector.mapPartitionsWithIndex { case(partition, iter) =>
> >   val base = sums(partition)
> >   println(partition, base)
> >   iter.scanLeft(base)(_+_).drop(1)
> > }.collect
> >
> >
> > I'd love to have that replaced with this:
> >
> > val vector = sc.parallelize(1 to 20, 3)
> > val cumSum: RDD[Int] = vector.scanLeft(0)(_+_)
> >
> >
> > Any thoughts on whether this contribution would be accepted?  What
> pitfalls
> > exist that I should be thinking about?
> >
> > Thanks!
> > Andrew
> >
>

Re: Implementing rdd.scanLeft()

Posted by Reynold Xin <rx...@databricks.com>.
I think the main concern is this would require scanning the data twice, and
maybe the user should be aware of it ...


On Thu, Jun 5, 2014 at 10:29 AM, Andrew Ash <an...@andrewash.com> wrote:

> I have a use case that would greatly benefit from RDDs having a .scanLeft()
> method.  Are the project developers interested in adding this to the public
> API?
>
>
> Looking through past message traffic, this has come up a few times.  The
> recommendation from the list before has been to implement a parallel prefix
> scan.
>
> http://comments.gmane.org/gmane.comp.lang.scala.spark.user/1880
> https://groups.google.com/forum/#!topic/spark-users/ts-FdB50ltY
>
> The algorithm Reynold sketched in the first link leads to this working
> implementation:
>
> val vector = sc.parallelize(1 to 20, 3)
>
> val sums = 0 +: vector.mapPartitionsWithIndex{ case(partition, iter) =>
> Iterator(iter.sum) }.collect.scanLeft(0)(_+_).drop(1)
>
> val prefixScan = vector.mapPartitionsWithIndex { case(partition, iter) =>
>   val base = sums(partition)
>   println(partition, base)
>   iter.scanLeft(base)(_+_).drop(1)
> }.collect
>
>
> I'd love to have that replaced with this:
>
> val vector = sc.parallelize(1 to 20, 3)
> val cumSum: RDD[Int] = vector.scanLeft(0)(_+_)
>
>
> Any thoughts on whether this contribution would be accepted?  What pitfalls
> exist that I should be thinking about?
>
> Thanks!
> Andrew
>