You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dong Wang (Jira)" <ji...@apache.org> on 2019/11/11 12:24:00 UTC
[jira] [Created] (SPARK-29844) Wrong unpersist strategy in
ml.recommendation.ASL.train
Dong Wang created SPARK-29844:
---------------------------------
Summary: Wrong unpersist strategy in ml.recommendation.ASL.train
Key: SPARK-29844
URL: https://issues.apache.org/jira/browse/SPARK-29844
Project: Spark
Issue Type: Improvement
Components: ML
Affects Versions: 3.0.0
Reporter: Dong Wang
In ml.recommendation.ASL.train(), there are many intermediate RDDs. At the end of the method, these RDDs invoke unpersist(), but the timings of unpersist is not right, which will cause recomputation and memory waste.
{code:scala}
val userIdAndFactors = userInBlocks
.mapValues(_.srcIds)
.join(userFactors)
.mapPartitions({ items =>
items.flatMap { case (_, (ids, factors)) =>
ids.view.zip(factors)
}
// Preserve the partitioning because IDs are consistent with the partitioners in userInBlocks
// and userFactors.
}, preservesPartitioning = true)
.setName("userFactors")
.persist(finalRDDStorageLevel) // Missing unpersist, but hard to fix
val itemIdAndFactors = itemInBlocks
.mapValues(_.srcIds)
.join(itemFactors)
.mapPartitions({ items =>
items.flatMap { case (_, (ids, factors)) =>
ids.view.zip(factors)
}
}, preservesPartitioning = true)
.setName("itemFactors")
.persist(finalRDDStorageLevel) // Missing unpersist, but hard to fix
if (finalRDDStorageLevel != StorageLevel.NONE) {
userIdAndFactors.count()
itemFactors.unpersist() // Premature unpersist
itemIdAndFactors.count()
userInBlocks.unpersist() // Lagging unpersist
userOutBlocks.unpersist() // Lagging unpersist
itemInBlocks.unpersist()
itemOutBlocks.unpersist() // Lagging unpersist
blockRatings.unpersist() // Lagging unpersist
}
(userIdAndFactors, itemIdAndFactors)
}
{code}
1. Unpersist itemFactors too early. itemIdAndFactors.count() will use itemFactors. So itemFactors will be recomputed.
2. Unpersist userInBlocks, userOutBlocks, itemOutBlocks, and blockRatings too late. The final action - itemIdAndFactors.count() will not use these RDDs, so these RDDs can be unpersisted before it to save memory.
By the way, itemIdAndFactors is persisted here but will never be unpersisted util the application ends. It may hurts the performance, but I think it's hard to fix.
This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org