You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Stahlman, Jonathan" <Jo...@capitalone.com> on 2015/07/16 23:18:39 UTC

How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hello all,

I am running the Spark recommendation algorithm in MLlib and I have been studying its output with various model configurations.  Ideally I would like to be able to run one job that trains the recommendation model with many different configurations to try to optimize for performance.  A sample code in python is copied below.

The issue I have is that each new model which is trained caches a set of RDDs and eventually the executors run out of memory.  Is there any way in Pyspark to unpersist() these RDDs after each iteration?  The names of the RDDs which I gather from the UI is:

itemInBlocks
itemOutBlocks
Products
ratingBlocks
userInBlocks
userOutBlocks
users

I am using Spark 1.3.  Thank you for any help!

Regards,
Jonathan




  data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
  functions = [rating] #defined elsewhere
  ranks = [10,20]
  iterations = [10,20]
  lambdas = [0.01,0.1]
  alphas  = [1.0,50.0]

  results = []
  for ratingFunction, rank, numIterations, m_lambda, m_alpha in itertools.product( functions, ranks, iterations, lambdas, alphas ):
    #train model
    ratings_train = data_train.map(lambda l: Rating( l.user, l.product, ratingFunction(l) ) )
    model   = ALS.trainImplicit( ratings_train, rank, numIterations, lambda_=float(m_lambda), alpha=float(m_alpha) )

    #test performance on CV data
    ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, ratingFunction(l) ) )
    auc = areaUnderCurve( ratings_cv, model.predictAll )

    #save results
    result = ",".join(str(l) for l in [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
    results.append(result)
________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Posted by Xiangrui Meng <me...@gmail.com>.
Hi Stahlman,

finalRDDStorageLevel is the storage level for the final user/item
factors. It is not common to set it to StorageLevel.NONE, unless you
want to save the factors directly to disk. So if it is NONE, we cannot
unpersist the intermediate RDDs (in/out blocks) because the final
user/item factors returned are not materialized. Otherwise, we have to
recompute from the very beginning (or last checkpoint) when you
materialize the final user/item factors. If you need want to have
multiple runs, you can try to set finalRDDStorageLevel to
MEMORY_AND_DISK, or clean previous runs so the cached RDDs get garbage
collected.

Best,
Xiangrui

On Wed, Jul 22, 2015 at 11:35 AM, Ganelin, Ilya
<Il...@capitalone.com> wrote:
> To be Unpersisted the RDD must be persisted first. If it's set to None, then
> it's not persisted, and as such does not need to be freed. Does that make
> sense ?
>
>
>
> Thank you,
> Ilya Ganelin
>
>
>
>
> -----Original Message-----
> From: Stahlman, Jonathan [Jonathan.Stahlman@capitalone.com]
> Sent: Wednesday, July 22, 2015 01:42 PM Eastern Standard Time
> To: user@spark.apache.org
> Subject: Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
>
> Hello again,
>
> In trying to understand the caching of intermediate RDDs by ALS, I looked
> into the source code and found what may be a bug.  Looking here:
>
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230
>
> you see that ALS.train() is being called with finalRDDStorageLevel =
> StorageLevel.NONE, which I would understand to mean that the intermediate
> RDDs will not be persisted.  Looking here:
>
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631
>
> unpersist() is only being called on the intermediate RDDs (all the *Blocks
> RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.
>
> This doesn’t make sense to me – I would expect the RDDs to be removed from
> the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way
> around.
>
> Jonathan
>
>
> From: <Stahlman>, Stahlman Jonathan <jo...@capitalone.com>
> Date: Thursday, July 16, 2015 at 2:18 PM
> To: "user@spark.apache.org" <us...@spark.apache.org>
> Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
>
> Hello all,
>
> I am running the Spark recommendation algorithm in MLlib and I have been
> studying its output with various model configurations.  Ideally I would like
> to be able to run one job that trains the recommendation model with many
> different configurations to try to optimize for performance.  A sample code
> in python is copied below.
>
> The issue I have is that each new model which is trained caches a set of
> RDDs and eventually the executors run out of memory.  Is there any way in
> Pyspark to unpersist() these RDDs after each iteration?  The names of the
> RDDs which I gather from the UI is:
>
> itemInBlocks
> itemOutBlocks
> Products
> ratingBlocks
> userInBlocks
> userOutBlocks
> users
>
> I am using Spark 1.3.  Thank you for any help!
>
> Regards,
> Jonathan
>
>
>
>
>   data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
>   functions = [rating] #defined elsewhere
>   ranks = [10,20]
>   iterations = [10,20]
>   lambdas = [0.01,0.1]
>   alphas  = [1.0,50.0]
>
>   results = []
>   for ratingFunction, rank, numIterations, m_lambda, m_alpha in
> itertools.product( functions, ranks, iterations, lambdas, alphas ):
>     #train model
>     ratings_train = data_train.map(lambda l: Rating( l.user, l.product,
> ratingFunction(l) ) )
>     model   = ALS.trainImplicit( ratings_train, rank, numIterations,
> lambda_=float(m_lambda), alpha=float(m_alpha) )
>
>     #test performance on CV data
>     ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product,
> ratingFunction(l) ) )
>     auc = areaUnderCurve( ratings_cv, model.predictAll )
>
>     #save results
>     result = ",".join(str(l) for l in
> [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
>     results.append(result)
>
> ________________________________
>
> The information contained in this e-mail is confidential and/or proprietary
> to Capital One and/or its affiliates and may only be used solely in
> performance of work or services for Capital One. The information transmitted
> herewith is intended only for use by the individual or entity to which it is
> addressed. If the reader of this message is not the intended recipient, you
> are hereby notified that any review, retransmission, dissemination,
> distribution, copying or other use of, or taking of any action in reliance
> upon this information is strictly prohibited. If you have received this
> communication in error, please contact the sender and delete the material
> from your computer.
>
>
> ________________________________
>
> The information contained in this e-mail is confidential and/or proprietary
> to Capital One and/or its affiliates and may only be used solely in
> performance of work or services for Capital One. The information transmitted
> herewith is intended only for use by the individual or entity to which it is
> addressed. If the reader of this message is not the intended recipient, you
> are hereby notified that any review, retransmission, dissemination,
> distribution, copying or other use of, or taking of any action in reliance
> upon this information is strictly prohibited. If you have received this
> communication in error, please contact the sender and delete the material
> from your computer.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


RE: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Posted by "Ganelin, Ilya" <Il...@capitalone.com>.
To be Unpersisted the RDD must be persisted first. If it's set to None, then it's not persisted, and as such does not need to be freed. Does that make sense ?



Thank you,
Ilya Ganelin



-----Original Message-----
From: Stahlman, Jonathan [Jonathan.Stahlman@capitalone.com<ma...@capitalone.com>]
Sent: Wednesday, July 22, 2015 01:42 PM Eastern Standard Time
To: user@spark.apache.org
Subject: Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hello again,

In trying to understand the caching of intermediate RDDs by ALS, I looked into the source code and found what may be a bug.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230

you see that ALS.train() is being called with finalRDDStorageLevel = StorageLevel.NONE, which I would understand to mean that the intermediate RDDs will not be persisted.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631

unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.

This doesn’t make sense to me – I would expect the RDDs to be removed from the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around.

Jonathan


From: <Stahlman>, Stahlman Jonathan <jo...@capitalone.com>>
Date: Thursday, July 16, 2015 at 2:18 PM
To: "user@spark.apache.org<ma...@spark.apache.org>" <us...@spark.apache.org>>
Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hello all,

I am running the Spark recommendation algorithm in MLlib and I have been studying its output with various model configurations.  Ideally I would like to be able to run one job that trains the recommendation model with many different configurations to try to optimize for performance.  A sample code in python is copied below.

The issue I have is that each new model which is trained caches a set of RDDs and eventually the executors run out of memory.  Is there any way in Pyspark to unpersist() these RDDs after each iteration?  The names of the RDDs which I gather from the UI is:

itemInBlocks
itemOutBlocks
Products
ratingBlocks
userInBlocks
userOutBlocks
users

I am using Spark 1.3.  Thank you for any help!

Regards,
Jonathan




  data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
  functions = [rating] #defined elsewhere
  ranks = [10,20]
  iterations = [10,20]
  lambdas = [0.01,0.1]
  alphas  = [1.0,50.0]

  results = []
  for ratingFunction, rank, numIterations, m_lambda, m_alpha in itertools.product( functions, ranks, iterations, lambdas, alphas ):
    #train model
    ratings_train = data_train.map(lambda l: Rating( l.user, l.product, ratingFunction(l) ) )
    model   = ALS.trainImplicit( ratings_train, rank, numIterations, lambda_=float(m_lambda), alpha=float(m_alpha) )

    #test performance on CV data
    ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, ratingFunction(l) ) )
    auc = areaUnderCurve( ratings_cv, model.predictAll )

    #save results
    result = ",".join(str(l) for l in [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
    results.append(result)

________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Posted by "Stahlman, Jonathan" <Jo...@capitalone.com>.
Hi Burak,

Looking at the source code, the intermediate RDDs used in ALS.train() are persisted during the computation using intermediateRDDStorageLevel (default value is StorageLevel.MEMORY_AND_DISK) - see here<https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L546>, here<https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L548>, and here<https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L556>.  At the end of the ALS calculation, these RDDs are no longer needed nor returned, so I would assume the logical choice would be to unpersist() these RDDs.  The strategy in the code seems to be set by finalRDDStorageLevel, which for some reason only calls unpersist() on the intermediate RDDs if  finalRDDStorageLevel != StorageLevel.NONE, which seems counter-intuitive to me.

Jonathan

From: Burak Yavuz <br...@gmail.com>>
Date: Wednesday, July 22, 2015 at 10:47 AM
To: Stahlman Jonathan <jo...@capitalone.com>>
Cc: "user@spark.apache.org<ma...@spark.apache.org>" <us...@spark.apache.org>>
Subject: Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hi Jonathan,

I believe calling persist with StorageLevel.NONE doesn't do anything. That's why the unpersist has an if statement before it.
Could you give more information about your setup please? Number of cores, memory, number of partitions of ratings_train?

Thanks,
Burak

On Wed, Jul 22, 2015 at 10:38 AM, Stahlman, Jonathan <Jo...@capitalone.com>> wrote:
Hello again,

In trying to understand the caching of intermediate RDDs by ALS, I looked into the source code and found what may be a bug.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230

you see that ALS.train() is being called with finalRDDStorageLevel = StorageLevel.NONE, which I would understand to mean that the intermediate RDDs will not be persisted.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631

unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.

This doesn’t make sense to me – I would expect the RDDs to be removed from the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around.

Jonathan


From: <Stahlman>, Stahlman Jonathan <jo...@capitalone.com>>
Date: Thursday, July 16, 2015 at 2:18 PM
To: "user@spark.apache.org<ma...@spark.apache.org>" <us...@spark.apache.org>>
Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hello all,

I am running the Spark recommendation algorithm in MLlib and I have been studying its output with various model configurations.  Ideally I would like to be able to run one job that trains the recommendation model with many different configurations to try to optimize for performance.  A sample code in python is copied below.

The issue I have is that each new model which is trained caches a set of RDDs and eventually the executors run out of memory.  Is there any way in Pyspark to unpersist() these RDDs after each iteration?  The names of the RDDs which I gather from the UI is:

itemInBlocks
itemOutBlocks
Products
ratingBlocks
userInBlocks
userOutBlocks
users

I am using Spark 1.3.  Thank you for any help!

Regards,
Jonathan




  data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
  functions = [rating] #defined elsewhere
  ranks = [10,20]
  iterations = [10,20]
  lambdas = [0.01,0.1]
  alphas  = [1.0,50.0]

  results = []
  for ratingFunction, rank, numIterations, m_lambda, m_alpha in itertools.product( functions, ranks, iterations, lambdas, alphas ):
    #train model
    ratings_train = data_train.map(lambda l: Rating( l.user, l.product, ratingFunction(l) ) )
    model   = ALS.trainImplicit( ratings_train, rank, numIterations, lambda_=float(m_lambda), alpha=float(m_alpha) )

    #test performance on CV data
    ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, ratingFunction(l) ) )
    auc = areaUnderCurve( ratings_cv, model.predictAll )

    #save results
    result = ",".join(str(l) for l in [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
    results.append(result)

________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Posted by Burak Yavuz <br...@gmail.com>.
Hi Jonathan,

I believe calling persist with StorageLevel.NONE doesn't do anything.
That's why the unpersist has an if statement before it.
Could you give more information about your setup please? Number of cores,
memory, number of partitions of ratings_train?

Thanks,
Burak

On Wed, Jul 22, 2015 at 10:38 AM, Stahlman, Jonathan <
Jonathan.Stahlman@capitalone.com> wrote:

> Hello again,
>
> In trying to understand the caching of intermediate RDDs by ALS, I looked
> into the source code and found what may be a bug.  Looking here:
>
>
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230
>
> you see that ALS.train() is being called with finalRDDStorageLevel =
> StorageLevel.NONE, which I would understand to mean that the intermediate
> RDDs will not be persisted.  Looking here:
>
>
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631
>
> unpersist() is only being called on the intermediate RDDs (all the *Blocks
> RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.
>
>
> This doesn’t make sense to me – I would expect the RDDs to be removed from
> the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way
> around.
>
> Jonathan
>
>
> From: <Stahlman>, Stahlman Jonathan <jo...@capitalone.com>
> Date: Thursday, July 16, 2015 at 2:18 PM
> To: "user@spark.apache.org" <us...@spark.apache.org>
> Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
>
> Hello all,
>
> I am running the Spark recommendation algorithm in MLlib and I have been
> studying its output with various model configurations.  Ideally I would
> like to be able to run one job that trains the recommendation model with
> many different configurations to try to optimize for performance.  A sample
> code in python is copied below.
>
> The issue I have is that each new model which is trained caches a set of
> RDDs and eventually the executors run out of memory.  Is there any way in
> Pyspark to unpersist() these RDDs after each iteration?  The names of the
> RDDs which I gather from the UI is:
>
> itemInBlocks
> itemOutBlocks
> Products
> ratingBlocks
> userInBlocks
> userOutBlocks
> users
>
> I am using Spark 1.3.  Thank you for any help!
>
> Regards,
> Jonathan
>
>
>
>
>   data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
>   functions = [rating] #defined elsewhere
>   ranks = [10,20]
>   iterations = [10,20]
>   lambdas = [0.01,0.1]
>   alphas  = [1.0,50.0]
>
>   results = []
>   for ratingFunction, rank, numIterations, m_lambda, m_alpha in
> itertools.product( functions, ranks, iterations, lambdas, alphas ):
>     #train model
>     ratings_train = data_train.map(lambda l: Rating( l.user, l.product,
> ratingFunction(l) ) )
>     model   = ALS.trainImplicit( ratings_train, rank, numIterations,
> lambda_=float(m_lambda), alpha=float(m_alpha) )
>
>     #test performance on CV data
>     ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product,
> ratingFunction(l) ) )
>     auc = areaUnderCurve( ratings_cv, model.predictAll )
>
>     #save results
>     result = ",".join(str(l) for l in
> [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
>     results.append(result)
>
> ------------------------------
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>

Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Posted by "Stahlman, Jonathan" <Jo...@capitalone.com>.
Hello again,

In trying to understand the caching of intermediate RDDs by ALS, I looked into the source code and found what may be a bug.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230

you see that ALS.train() is being called with finalRDDStorageLevel = StorageLevel.NONE, which I would understand to mean that the intermediate RDDs will not be persisted.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631

unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.

This doesn’t make sense to me – I would expect the RDDs to be removed from the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around.

Jonathan


From: <Stahlman>, Stahlman Jonathan <jo...@capitalone.com>>
Date: Thursday, July 16, 2015 at 2:18 PM
To: "user@spark.apache.org<ma...@spark.apache.org>" <us...@spark.apache.org>>
Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hello all,

I am running the Spark recommendation algorithm in MLlib and I have been studying its output with various model configurations.  Ideally I would like to be able to run one job that trains the recommendation model with many different configurations to try to optimize for performance.  A sample code in python is copied below.

The issue I have is that each new model which is trained caches a set of RDDs and eventually the executors run out of memory.  Is there any way in Pyspark to unpersist() these RDDs after each iteration?  The names of the RDDs which I gather from the UI is:

itemInBlocks
itemOutBlocks
Products
ratingBlocks
userInBlocks
userOutBlocks
users

I am using Spark 1.3.  Thank you for any help!

Regards,
Jonathan




  data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
  functions = [rating] #defined elsewhere
  ranks = [10,20]
  iterations = [10,20]
  lambdas = [0.01,0.1]
  alphas  = [1.0,50.0]

  results = []
  for ratingFunction, rank, numIterations, m_lambda, m_alpha in itertools.product( functions, ranks, iterations, lambdas, alphas ):
    #train model
    ratings_train = data_train.map(lambda l: Rating( l.user, l.product, ratingFunction(l) ) )
    model   = ALS.trainImplicit( ratings_train, rank, numIterations, lambda_=float(m_lambda), alpha=float(m_alpha) )

    #test performance on CV data
    ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, ratingFunction(l) ) )
    auc = areaUnderCurve( ratings_cv, model.predictAll )

    #save results
    result = ",".join(str(l) for l in [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
    results.append(result)
________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Posted by Ewan Higgs <ew...@ugent.be>.
Sean,

Thanks.
It's a developer API and doesn't appear to be exposed.

Ewan

On 07/12/15 15:06, Sean Owen wrote:
> I'm not sure if this is available in Python but from 1.3 on you should
> be able to call ALS.setFinalRDDStorageLevel with level "none" to ask
> it to unpersist when it is done.
>
> On Mon, Dec 7, 2015 at 1:42 PM, Ewan Higgs <ew...@ugent.be> wrote:
>> Jonathan,
>> Did you ever get to the bottom of this? I have some users working with Spark
>> in a classroom setting and our example notebooks run into problems where
>> there is so much spilled to disk that they run out of quota. A 1.5G input
>> set becomes >30G of spilled data on disk. I looked into how I could
>> unpersist the data so I could clean up the files, but I was unsuccessful.
>>
>> We're using Spark 1.5.0
>>
>> Yours,
>> Ewan
>>
>> On 16/07/15 23:18, Stahlman, Jonathan wrote:
>>
>> Hello all,
>>
>> I am running the Spark recommendation algorithm in MLlib and I have been
>> studying its output with various model configurations.  Ideally I would like
>> to be able to run one job that trains the recommendation model with many
>> different configurations to try to optimize for performance.  A sample code
>> in python is copied below.
>>
>> The issue I have is that each new model which is trained caches a set of
>> RDDs and eventually the executors run out of memory.  Is there any way in
>> Pyspark to unpersist() these RDDs after each iteration?  The names of the
>> RDDs which I gather from the UI is:
>>
>> itemInBlocks
>> itemOutBlocks
>> Products
>> ratingBlocks
>> userInBlocks
>> userOutBlocks
>> users
>>
>> I am using Spark 1.3.  Thank you for any help!
>>
>> Regards,
>> Jonathan
>>
>>
>>
>>
>>    data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
>>    functions = [rating] #defined elsewhere
>>    ranks = [10,20]
>>    iterations = [10,20]
>>    lambdas = [0.01,0.1]
>>    alphas  = [1.0,50.0]
>>
>>    results = []
>>    for ratingFunction, rank, numIterations, m_lambda, m_alpha in
>> itertools.product( functions, ranks, iterations, lambdas, alphas ):
>>      #train model
>>      ratings_train = data_train.map(lambda l: Rating( l.user, l.product,
>> ratingFunction(l) ) )
>>      model   = ALS.trainImplicit( ratings_train, rank, numIterations,
>> lambda_=float(m_lambda), alpha=float(m_alpha) )
>>
>>      #test performance on CV data
>>      ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product,
>> ratingFunction(l) ) )
>>      auc = areaUnderCurve( ratings_cv, model.predictAll )
>>
>>      #save results
>>      result = ",".join(str(l) for l in
>> [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
>>      results.append(result)
>>
>> ________________________________
>>
>> The information contained in this e-mail is confidential and/or proprietary
>> to Capital One and/or its affiliates and may only be used solely in
>> performance of work or services for Capital One. The information transmitted
>> herewith is intended only for use by the individual or entity to which it is
>> addressed. If the reader of this message is not the intended recipient, you
>> are hereby notified that any review, retransmission, dissemination,
>> distribution, copying or other use of, or taking of any action in reliance
>> upon this information is strictly prohibited. If you have received this
>> communication in error, please contact the sender and delete the material
>> from your computer.
>>
>>


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Posted by Sean Owen <so...@cloudera.com>.
I'm not sure if this is available in Python but from 1.3 on you should
be able to call ALS.setFinalRDDStorageLevel with level "none" to ask
it to unpersist when it is done.

On Mon, Dec 7, 2015 at 1:42 PM, Ewan Higgs <ew...@ugent.be> wrote:
> Jonathan,
> Did you ever get to the bottom of this? I have some users working with Spark
> in a classroom setting and our example notebooks run into problems where
> there is so much spilled to disk that they run out of quota. A 1.5G input
> set becomes >30G of spilled data on disk. I looked into how I could
> unpersist the data so I could clean up the files, but I was unsuccessful.
>
> We're using Spark 1.5.0
>
> Yours,
> Ewan
>
> On 16/07/15 23:18, Stahlman, Jonathan wrote:
>
> Hello all,
>
> I am running the Spark recommendation algorithm in MLlib and I have been
> studying its output with various model configurations.  Ideally I would like
> to be able to run one job that trains the recommendation model with many
> different configurations to try to optimize for performance.  A sample code
> in python is copied below.
>
> The issue I have is that each new model which is trained caches a set of
> RDDs and eventually the executors run out of memory.  Is there any way in
> Pyspark to unpersist() these RDDs after each iteration?  The names of the
> RDDs which I gather from the UI is:
>
> itemInBlocks
> itemOutBlocks
> Products
> ratingBlocks
> userInBlocks
> userOutBlocks
> users
>
> I am using Spark 1.3.  Thank you for any help!
>
> Regards,
> Jonathan
>
>
>
>
>   data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
>   functions = [rating] #defined elsewhere
>   ranks = [10,20]
>   iterations = [10,20]
>   lambdas = [0.01,0.1]
>   alphas  = [1.0,50.0]
>
>   results = []
>   for ratingFunction, rank, numIterations, m_lambda, m_alpha in
> itertools.product( functions, ranks, iterations, lambdas, alphas ):
>     #train model
>     ratings_train = data_train.map(lambda l: Rating( l.user, l.product,
> ratingFunction(l) ) )
>     model   = ALS.trainImplicit( ratings_train, rank, numIterations,
> lambda_=float(m_lambda), alpha=float(m_alpha) )
>
>     #test performance on CV data
>     ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product,
> ratingFunction(l) ) )
>     auc = areaUnderCurve( ratings_cv, model.predictAll )
>
>     #save results
>     result = ",".join(str(l) for l in
> [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
>     results.append(result)
>
> ________________________________
>
> The information contained in this e-mail is confidential and/or proprietary
> to Capital One and/or its affiliates and may only be used solely in
> performance of work or services for Capital One. The information transmitted
> herewith is intended only for use by the individual or entity to which it is
> addressed. If the reader of this message is not the intended recipient, you
> are hereby notified that any review, retransmission, dissemination,
> distribution, copying or other use of, or taking of any action in reliance
> upon this information is strictly prohibited. If you have received this
> communication in error, please contact the sender and delete the material
> from your computer.
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Posted by Ewan Higgs <ew...@ugent.be>.
Jonathan,
Did you ever get to the bottom of this? I have some users working with 
Spark in a classroom setting and our example notebooks run into problems 
where there is so much spilled to disk that they run out of quota. A 
1.5G input set becomes >30G of spilled data on disk. I looked into how I 
could unpersist the data so I could clean up the files, but I was 
unsuccessful.

We're using Spark 1.5.0

Yours,
Ewan

On 16/07/15 23:18, Stahlman, Jonathan wrote:
> Hello all,
>
> I am running the Spark recommendation algorithm in MLlib and I have 
> been studying its output with various model configurations.  Ideally I 
> would like to be able to run one job that trains the recommendation 
> model with many different configurations to try to optimize for 
> performance.  A sample code in python is copied below.
>
> The issue I have is that each new model which is trained caches a set 
> of RDDs and eventually the executors run out of memory.  Is there any 
> way in Pyspark to unpersist() these RDDs after each iteration?  The 
> names of the RDDs which I gather from the UI is:
>
> itemInBlocks
> itemOutBlocks
> Products
> ratingBlocks
> userInBlocks
> userOutBlocks
> users
>
> I am using Spark 1.3.  Thank you for any help!
>
> Regards,
> Jonathan
>
>
>
>
> data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
>   functions = [rating] #defined elsewhere
>   ranks = [10,20]
>   iterations = [10,20]
>   lambdas = [0.01,0.1]
>   alphas  = [1.0,50.0]
>
>   results = []
>   for ratingFunction, rank, numIterations, m_lambda, m_alpha in 
> itertools.product( functions, ranks, iterations, lambdas, alphas ):
>     #train model
>     ratings_train = data_train.map(lambda l: Rating( l.user, 
> l.product, ratingFunction(l) ) )
>     model   = ALS.trainImplicit( ratings_train, rank, numIterations, 
> lambda_=float(m_lambda), alpha=float(m_alpha) )
>
>     #test performance on CV data
>     ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, 
> ratingFunction(l) ) )
>     auc = areaUnderCurve( ratings_cv, model.predictAll )
>
>     #save results
>     result = ",".join(str(l) for l in 
> [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
>     results.append(result)
>
> ------------------------------------------------------------------------
>
> The information contained in this e-mail is confidential and/or 
> proprietary to Capital One and/or its affiliates and may only be used 
> solely in performance of work or services for Capital One. The 
> information transmitted herewith is intended only for use by the 
> individual or entity to which it is addressed. If the reader of this 
> message is not the intended recipient, you are hereby notified that 
> any review, retransmission, dissemination, distribution, copying or 
> other use of, or taking of any action in reliance upon this 
> information is strictly prohibited. If you have received this 
> communication in error, please contact the sender and delete the 
> material from your computer.
>