You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean R. Owen (Jira)" <ji...@apache.org> on 2019/11/13 00:34:00 UTC

[jira] [Resolved] (SPARK-29844) Improper unpersist strategy in ml.recommendation.ASL.train

     [ https://issues.apache.org/jira/browse/SPARK-29844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean R. Owen resolved SPARK-29844.
----------------------------------
    Fix Version/s: 3.0.0
       Resolution: Fixed

Issue resolved by pull request 26469
[https://github.com/apache/spark/pull/26469]

> Improper unpersist strategy in ml.recommendation.ASL.train
> ----------------------------------------------------------
>
>                 Key: SPARK-29844
>                 URL: https://issues.apache.org/jira/browse/SPARK-29844
>             Project: Spark
>          Issue Type: Improvement
>          Components: ML
>    Affects Versions: 2.4.3
>            Reporter: Dong Wang
>            Assignee: Dong Wang
>            Priority: Minor
>             Fix For: 3.0.0
>
>
> In ml.recommendation.ASL.train(), there are many intermediate RDDs. At the end of the method, these RDDs invoke unpersist(), but the timings of unpersist is not right, which will cause recomputation and memory waste.
> {code:scala}
>     val userIdAndFactors = userInBlocks
>       .mapValues(_.srcIds)
>       .join(userFactors)
>       .mapPartitions({ items =>
>         items.flatMap { case (_, (ids, factors)) =>
>           ids.view.zip(factors)
>         }
>       // Preserve the partitioning because IDs are consistent with the partitioners in userInBlocks
>       // and userFactors.
>       }, preservesPartitioning = true)
>       .setName("userFactors")
>       .persist(finalRDDStorageLevel) // Missing unpersist, but hard to fix
>     val itemIdAndFactors = itemInBlocks
>       .mapValues(_.srcIds)
>       .join(itemFactors)
>       .mapPartitions({ items =>
>         items.flatMap { case (_, (ids, factors)) =>
>           ids.view.zip(factors)
>         }
>       }, preservesPartitioning = true)
>       .setName("itemFactors")
>       .persist(finalRDDStorageLevel) // Missing unpersist, but hard to fix
>     if (finalRDDStorageLevel != StorageLevel.NONE) {
>       userIdAndFactors.count()
>       itemFactors.unpersist() // Premature unpersist
>       itemIdAndFactors.count()
>       userInBlocks.unpersist() // Lagging unpersist
>       userOutBlocks.unpersist() // Lagging unpersist
>       itemInBlocks.unpersist() 
>       itemOutBlocks.unpersist() // Lagging unpersist
>       blockRatings.unpersist() // Lagging unpersist
>     }
>     (userIdAndFactors, itemIdAndFactors)
>   }
> {code}
> 1. Unpersist itemFactors too early. itemIdAndFactors.count() will use itemFactors. So itemFactors will be recomputed.
> 2. Unpersist userInBlocks, userOutBlocks, itemOutBlocks, and blockRatings too late. The final action - itemIdAndFactors.count() will not use these RDDs, so these RDDs can be unpersisted before it to save memory.
> By the way, itemIdAndFactors is persisted here but will never be unpersisted util the application ends. It may hurts the performance, but I think it's hard to fix.
> This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org