You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Gerard Maas <ge...@gmail.com> on 2014/06/02 22:13:16 UTC

How to create RDDs from another RDD?

The RDD API has  functions to join multiple RDDs, such as PariRDD.join
or PariRDD.cogroup that take another RDD as input. e.g.
 firstRDD.join(secondRDD)

I'm looking for ways to do the opposite: split an existing RDD. What is the
right way to create derivate RDDs from an existing RDD?

e.g. imagine I've an  collection or pairs as input: colRDD =
 (k1->v1)...(kx->vy)...
I could do:
val byKey = colRDD.groupByKey() = (k1->(k1->v1... k1->vn)),...(kn->(kn->vy,
...))

Now, I'd like to create an RDD from the values to have something like:

val groupedRDDs = (k1->RDD(k1->v1,...k1->vn), kn -> RDD(kn->vy, ...))

in this example, there's an f(byKey) = groupedRDDs.  What's that f(x) ?

Would:  byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)}  the
right/recommended way to do this?  Any other options?

Thanks,

Gerard.

Re: How to create RDDs from another RDD?

Posted by Andrew Ash <an...@andrewash.com>.
Hmm that sounds like it could be done in a custom OutputFormat, but I'm not
familiar enough with custom OutputFormats to say that's the right thing to
do.


On Tue, Jun 3, 2014 at 10:23 AM, Gerard Maas <ge...@gmail.com> wrote:

> Hi Andrew,
>
> Thanks for your answer.
>
> The reason of the question: I've been trying to contribute to the
> community by helping answering Spark-related questions on Stack Overflow.
>
> (note on that: Given the growing volume on the user list lately, I think
> it will need to scale out to other venues, so helping at SO will further
> contribute to the mainstream road of Spark)
>
> I came across this question [1] on how to save parts of an RDD to
> different HDFS files. I looked into the impl of saveAsText. The delegation
> path terminates on  PairRDD.saveAsHadoopDataset and looks like the impl
> is quite tight to the RDD data, so the potential easiest way is solve the
> problem at hand is to create several RDDs from the original RDD.
>
> The issue I see is that the  'sc.makeRDD(v.toSeq)' will potentially blow
> when trying to materialize the iterator into a seq.  I also don't know what
> the behaviour of that call to SparkContext will be on a remote worker.
>
> My current conclusion is that the best option would be to roll an own
> saveHdfsFile(...)
>
> Would you agree?
>
> -greetz, Gerard.
>
>
> [1]
> http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job
>
>
>
>
> On Mon, Jun 2, 2014 at 11:44 PM, Andrew Ash <an...@andrewash.com> wrote:
>
>> Hi Gerard,
>>
>> Usually when I want to split one RDD into several, I'm better off
>> re-thinking the algorithm to do all the computation at once.  Example:
>>
>> Suppose you had a dataset that was the tuple (URL, webserver,
>> pageSizeBytes), and you wanted to find out the average page size that each
>> webserver (e.g. Apache, nginx, IIS, etc) served.  Rather than splitting
>> your allPagesRDD into an RDD for each webserver, like nginxRDD, apacheRDD,
>> IISRDD, it's probably better to do the average computation over all at
>> once, like this:
>>
>> // allPagesRDD is (URL, webserver, pageSizeBytes)
>> allPagesRDD.keyBy(getWebserver)
>>   .map(k => (k.pageSizeBytes, 1))
>>   .reduceByKey( (a,b) => (a._1 + b._1, a._2 + b._2)
>>   .mapValues( v => (v._1 / v._2) )
>>
>> For this example you could use something like Summingbird to keep from
>> doing the average tracking yourself.
>>
>> Can you go into more detail about why you want to split one RDD into
>> several?
>>
>>
>> On Mon, Jun 2, 2014 at 1:13 PM, Gerard Maas <ge...@gmail.com>
>> wrote:
>>
>>> The RDD API has  functions to join multiple RDDs, such as PariRDD.join
>>> or PariRDD.cogroup that take another RDD as input. e.g.
>>>  firstRDD.join(secondRDD)
>>>
>>> I'm looking for ways to do the opposite: split an existing RDD. What is
>>> the right way to create derivate RDDs from an existing RDD?
>>>
>>> e.g. imagine I've an  collection or pairs as input: colRDD =
>>>  (k1->v1)...(kx->vy)...
>>> I could do:
>>> val byKey = colRDD.groupByKey() = (k1->(k1->v1...
>>> k1->vn)),...(kn->(kn->vy, ...))
>>>
>>> Now, I'd like to create an RDD from the values to have something like:
>>>
>>> val groupedRDDs = (k1->RDD(k1->v1,...k1->vn), kn -> RDD(kn->vy, ...))
>>>
>>> in this example, there's an f(byKey) = groupedRDDs.  What's that f(x) ?
>>>
>>> Would:  byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)}  the
>>> right/recommended way to do this?  Any other options?
>>>
>>> Thanks,
>>>
>>> Gerard.
>>>
>>
>>
>

Re: How to create RDDs from another RDD?

Posted by Gerard Maas <ge...@gmail.com>.
Hi Andrew,

Thanks for your answer.

The reason of the question: I've been trying to contribute to the community
by helping answering Spark-related questions on Stack Overflow.

(note on that: Given the growing volume on the user list lately, I think it
will need to scale out to other venues, so helping at SO will further
contribute to the mainstream road of Spark)

I came across this question [1] on how to save parts of an RDD to different
HDFS files. I looked into the impl of saveAsText. The delegation path
terminates on  PairRDD.saveAsHadoopDataset and looks like the impl is quite
tight to the RDD data, so the potential easiest way is solve the problem at
hand is to create several RDDs from the original RDD.

The issue I see is that the  'sc.makeRDD(v.toSeq)' will potentially blow
when trying to materialize the iterator into a seq.  I also don't know what
the behaviour of that call to SparkContext will be on a remote worker.

My current conclusion is that the best option would be to roll an own
saveHdfsFile(...)

Would you agree?

-greetz, Gerard.


[1]
http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job




On Mon, Jun 2, 2014 at 11:44 PM, Andrew Ash <an...@andrewash.com> wrote:

> Hi Gerard,
>
> Usually when I want to split one RDD into several, I'm better off
> re-thinking the algorithm to do all the computation at once.  Example:
>
> Suppose you had a dataset that was the tuple (URL, webserver,
> pageSizeBytes), and you wanted to find out the average page size that each
> webserver (e.g. Apache, nginx, IIS, etc) served.  Rather than splitting
> your allPagesRDD into an RDD for each webserver, like nginxRDD, apacheRDD,
> IISRDD, it's probably better to do the average computation over all at
> once, like this:
>
> // allPagesRDD is (URL, webserver, pageSizeBytes)
> allPagesRDD.keyBy(getWebserver)
>   .map(k => (k.pageSizeBytes, 1))
>   .reduceByKey( (a,b) => (a._1 + b._1, a._2 + b._2)
>   .mapValues( v => (v._1 / v._2) )
>
> For this example you could use something like Summingbird to keep from
> doing the average tracking yourself.
>
> Can you go into more detail about why you want to split one RDD into
> several?
>
>
> On Mon, Jun 2, 2014 at 1:13 PM, Gerard Maas <ge...@gmail.com> wrote:
>
>> The RDD API has  functions to join multiple RDDs, such as PariRDD.join
>> or PariRDD.cogroup that take another RDD as input. e.g.
>>  firstRDD.join(secondRDD)
>>
>> I'm looking for ways to do the opposite: split an existing RDD. What is
>> the right way to create derivate RDDs from an existing RDD?
>>
>> e.g. imagine I've an  collection or pairs as input: colRDD =
>>  (k1->v1)...(kx->vy)...
>> I could do:
>> val byKey = colRDD.groupByKey() = (k1->(k1->v1...
>> k1->vn)),...(kn->(kn->vy, ...))
>>
>> Now, I'd like to create an RDD from the values to have something like:
>>
>> val groupedRDDs = (k1->RDD(k1->v1,...k1->vn), kn -> RDD(kn->vy, ...))
>>
>> in this example, there's an f(byKey) = groupedRDDs.  What's that f(x) ?
>>
>> Would:  byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)}  the
>> right/recommended way to do this?  Any other options?
>>
>> Thanks,
>>
>> Gerard.
>>
>
>

Re: How to create RDDs from another RDD?

Posted by Andrew Ash <an...@andrewash.com>.
Hi Gerard,

Usually when I want to split one RDD into several, I'm better off
re-thinking the algorithm to do all the computation at once.  Example:

Suppose you had a dataset that was the tuple (URL, webserver,
pageSizeBytes), and you wanted to find out the average page size that each
webserver (e.g. Apache, nginx, IIS, etc) served.  Rather than splitting
your allPagesRDD into an RDD for each webserver, like nginxRDD, apacheRDD,
IISRDD, it's probably better to do the average computation over all at
once, like this:

// allPagesRDD is (URL, webserver, pageSizeBytes)
allPagesRDD.keyBy(getWebserver)
  .map(k => (k.pageSizeBytes, 1))
  .reduceByKey( (a,b) => (a._1 + b._1, a._2 + b._2)
  .mapValues( v => (v._1 / v._2) )

For this example you could use something like Summingbird to keep from
doing the average tracking yourself.

Can you go into more detail about why you want to split one RDD into
several?


On Mon, Jun 2, 2014 at 1:13 PM, Gerard Maas <ge...@gmail.com> wrote:

> The RDD API has  functions to join multiple RDDs, such as PariRDD.join
> or PariRDD.cogroup that take another RDD as input. e.g.
>  firstRDD.join(secondRDD)
>
> I'm looking for ways to do the opposite: split an existing RDD. What is
> the right way to create derivate RDDs from an existing RDD?
>
> e.g. imagine I've an  collection or pairs as input: colRDD =
>  (k1->v1)...(kx->vy)...
> I could do:
> val byKey = colRDD.groupByKey() = (k1->(k1->v1...
> k1->vn)),...(kn->(kn->vy, ...))
>
> Now, I'd like to create an RDD from the values to have something like:
>
> val groupedRDDs = (k1->RDD(k1->v1,...k1->vn), kn -> RDD(kn->vy, ...))
>
> in this example, there's an f(byKey) = groupedRDDs.  What's that f(x) ?
>
> Would:  byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)}  the
> right/recommended way to do this?  Any other options?
>
> Thanks,
>
> Gerard.
>