You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Arun Ahuja <aa...@gmail.com> on 2014/09/23 23:08:34 UTC

General question on persist

I have a general question on when persisting will be beneficial and when it
won't:

I have a task that runs as follow

keyedRecordPieces  = records.flatMap( record => Seq(key, recordPieces))
partitoned = keyedRecordPieces.partitionBy(KeyPartitioner)

partitoned.mapPartitions(doComputation).save()

Is there value in having a persist somewhere here?  For example if the
flatMap step is particularly expensive, will it ever be computed twice when
there are no failures?

Thanks

Arun

Re: General question on persist

Posted by Liquan Pei <li...@gmail.com>.
For sortByKey, I need more reading to answer your question.

For your second question, you can do partitioned.cache() to avoid multiple
data shuffling.

-Liquan

On Tue, Sep 23, 2014 at 6:08 PM, Arun Ahuja <aa...@gmail.com> wrote:

> Thanks for the answers - from what I've read doing a sortByKey still does
> not give you any guarantees on the order of the iterator in a mapPartitions?
>
> Also one more question on the first example with persist
> .
>
> keyedRecordPieces  = records.flatMap( record => Seq(key, recordPieces))
> partitoned = keyedRecordPieces.partitionBy(KeyPartitioner)
>
> partitoned.mapPartitions(doComputation).save()
>
>
> ​
>
> You mentioned to persist keyedRecordPieces, however, suppose I had the
> following:
>
> keyedRecordPieces  = records.flatMap( record => Seq(key, recordPieces))
> partitoned = keyedRecordPieces.partitionBy(KeyPartitioner)
>
> partitoned.mapPartitions(doComputation).save()
> partitoned.mapPartitions(doOtherComputation).save()
>
> ​
>
> Will this force two shuffles of the data? How can I guarantee that the
> data is only reshuffled once?
>
> Thanks,
> Arun
>
>
>
> On Tue, Sep 23, 2014 at 5:58 PM, Liquan Pei <li...@gmail.com> wrote:
>
>> If you only do the computation once, there will be no difference.
>>
>> For mapPartitions
>> 1) All of the records of the Iterator[T] that a single function call in
>> mapPartitions process must fit into memory, correct?
>> It depends on what you do with elements in that partition. If you collect
>> every element in mapPartition, yes, all the records needs to fit in
>> available memory.  However, if you perform an aggregation (SUM, MIN, eg)
>> over elements the from the iterator, only small amount of memory is needed.
>>
>> Talking about memory, by default, on each executor, 60% of memory is used
>> for RDD caching and the result is used for other data structures in your
>> computation.
>>
>> 2) Is there someway to process that iterator in sorted order?
>> My understanding is that there is no direct way to do this. You can use
>> mapPartitions and collect result through iterator and sort the result in
>> memory. Or you can call sortByKey on your RDD first then process you data
>> using mapPartitions. Reference of sortByKey is available at
>>
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.OrderedRDDFunctions
>>
>> Liquan
>>
>> On Tue, Sep 23, 2014 at 2:41 PM, Arun Ahuja <aa...@gmail.com> wrote:
>>
>>> Thanks Liquan, that makes sense, but if I am only doin the computation
>>> once, there will essentially be no difference, correct?
>>>
>>> I had second question related to mapPartitions
>>> 1) All of the records of the Iterator[T] that a single function call in
>>> mapPartitions process must fit into memory, correct?
>>> 2) Is there someway to process that iterator in sorted order?
>>>
>>> Thanks!
>>>
>>> Arun
>>>
>>> On Tue, Sep 23, 2014 at 5:21 PM, Liquan Pei <li...@gmail.com> wrote:
>>>
>>>> Hi Arun,
>>>>
>>>> The intermediate results like keyedRecordPieces will not be
>>>> materialized.  This indicates that if you run
>>>>
>>>> partitoned = keyedRecordPieces.partitionBy(KeyPartitioner)
>>>>
>>>> partitoned.mapPartitions(doComputation).save()
>>>>
>>>> again, the keyedRecordPieces will be re-computed . In this case, cache
>>>> or persist keyedRecordPieces is a good idea to eliminate unnecessary
>>>> expensive computation. What you can probably do is
>>>>
>>>> keyedRecordPieces  = records.flatMap( record => Seq(key,
>>>> recordPieces)).cache()
>>>>
>>>> Which will cache the RDD referenced by keyedRecordPieces in memory. For
>>>> more options on cache and persist, take a look at
>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD.
>>>> There are two APIs you can use to persist RDDs and one allows you to
>>>> specify storage level.
>>>>
>>>> Thanks,
>>>> Liquan
>>>>
>>>>
>>>>
>>>> On Tue, Sep 23, 2014 at 2:08 PM, Arun Ahuja <aa...@gmail.com> wrote:
>>>>
>>>>> I have a general question on when persisting will be beneficial and
>>>>> when it won't:
>>>>>
>>>>> I have a task that runs as follow
>>>>>
>>>>> keyedRecordPieces  = records.flatMap( record => Seq(key, recordPieces))
>>>>> partitoned = keyedRecordPieces.partitionBy(KeyPartitioner)
>>>>>
>>>>> partitoned.mapPartitions(doComputation).save()
>>>>>
>>>>> Is there value in having a persist somewhere here?  For example if the
>>>>> flatMap step is particularly expensive, will it ever be computed twice when
>>>>> there are no failures?
>>>>>
>>>>> Thanks
>>>>>
>>>>> Arun
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Liquan Pei
>>>> Department of Physics
>>>> University of Massachusetts Amherst
>>>>
>>>
>>>
>>
>>
>> --
>> Liquan Pei
>> Department of Physics
>> University of Massachusetts Amherst
>>
>
>


-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst

Re: General question on persist

Posted by Arun Ahuja <aa...@gmail.com>.
Thanks Liquan, that makes sense, but if I am only doin the computation
once, there will essentially be no difference, correct?

I had second question related to mapPartitions
1) All of the records of the Iterator[T] that a single function call in
mapPartitions process must fit into memory, correct?
2) Is there someway to process that iterator in sorted order?

Thanks!

Arun

On Tue, Sep 23, 2014 at 5:21 PM, Liquan Pei <li...@gmail.com> wrote:

> Hi Arun,
>
> The intermediate results like keyedRecordPieces will not be
> materialized.  This indicates that if you run
>
> partitoned = keyedRecordPieces.partitionBy(KeyPartitioner)
>
> partitoned.mapPartitions(doComputation).save()
>
> again, the keyedRecordPieces will be re-computed . In this case, cache or
> persist keyedRecordPieces is a good idea to eliminate unnecessary expensive
> computation. What you can probably do is
>
> keyedRecordPieces  = records.flatMap( record => Seq(key,
> recordPieces)).cache()
>
> Which will cache the RDD referenced by keyedRecordPieces in memory. For
> more options on cache and persist, take a look at
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD.
> There are two APIs you can use to persist RDDs and one allows you to
> specify storage level.
>
> Thanks,
> Liquan
>
>
>
> On Tue, Sep 23, 2014 at 2:08 PM, Arun Ahuja <aa...@gmail.com> wrote:
>
>> I have a general question on when persisting will be beneficial and when
>> it won't:
>>
>> I have a task that runs as follow
>>
>> keyedRecordPieces  = records.flatMap( record => Seq(key, recordPieces))
>> partitoned = keyedRecordPieces.partitionBy(KeyPartitioner)
>>
>> partitoned.mapPartitions(doComputation).save()
>>
>> Is there value in having a persist somewhere here?  For example if the
>> flatMap step is particularly expensive, will it ever be computed twice when
>> there are no failures?
>>
>> Thanks
>>
>> Arun
>>
>
>
>
> --
> Liquan Pei
> Department of Physics
> University of Massachusetts Amherst
>

Re: General question on persist

Posted by Liquan Pei <li...@gmail.com>.
Hi Arun,

The intermediate results like keyedRecordPieces will not be materialized.
This indicates that if you run

partitoned = keyedRecordPieces.partitionBy(KeyPartitioner)

partitoned.mapPartitions(doComputation).save()

again, the keyedRecordPieces will be re-computed . In this case, cache or
persist keyedRecordPieces is a good idea to eliminate unnecessary expensive
computation. What you can probably do is

keyedRecordPieces  = records.flatMap( record => Seq(key,
recordPieces)).cache()

Which will cache the RDD referenced by keyedRecordPieces in memory. For
more options on cache and persist, take a look at
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD.
There are two APIs you can use to persist RDDs and one allows you to
specify storage level.

Thanks,
Liquan



On Tue, Sep 23, 2014 at 2:08 PM, Arun Ahuja <aa...@gmail.com> wrote:

> I have a general question on when persisting will be beneficial and when
> it won't:
>
> I have a task that runs as follow
>
> keyedRecordPieces  = records.flatMap( record => Seq(key, recordPieces))
> partitoned = keyedRecordPieces.partitionBy(KeyPartitioner)
>
> partitoned.mapPartitions(doComputation).save()
>
> Is there value in having a persist somewhere here?  For example if the
> flatMap step is particularly expensive, will it ever be computed twice when
> there are no failures?
>
> Thanks
>
> Arun
>



-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst