You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Joseph K. Bradley (JIRA)" <ji...@apache.org> on 2016/04/08 04:49:25 UTC
[jira] [Resolved] (SPARK-13048) EMLDAOptimizer deletes dependent
checkpoint of DistributedLDAModel
[ https://issues.apache.org/jira/browse/SPARK-13048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Joseph K. Bradley resolved SPARK-13048.
---------------------------------------
Resolution: Fixed
Fix Version/s: 2.0.0
Issue resolved by pull request 12166
[https://github.com/apache/spark/pull/12166]
> EMLDAOptimizer deletes dependent checkpoint of DistributedLDAModel
> ------------------------------------------------------------------
>
> Key: SPARK-13048
> URL: https://issues.apache.org/jira/browse/SPARK-13048
> Project: Spark
> Issue Type: Bug
> Components: MLlib
> Affects Versions: 1.5.2
> Environment: Standalone Spark cluster
> Reporter: Jeff Stein
> Assignee: Joseph K. Bradley
> Fix For: 2.0.0
>
>
> In EMLDAOptimizer, all checkpoints are deleted before returning the DistributedLDAModel.
> The most recent checkpoint is still necessary for operations on the DistributedLDAModel under a couple scenarios:
> - The graph doesn't fit in memory on the worker nodes (e.g. very large data set).
> - Late worker failures that require reading the now-dependent checkpoint.
> I ran into this problem running a 10M record LDA model in a memory starved environment. The model consistently failed in either the {{collect at LDAModel.scala:528}} stage (when converting to a LocalLDAModel) or in the {{reduce at LDAModel.scala:563}} stage (when calling "describeTopics" on the model). In both cases, a FileNotFoundException is thrown attempting to access a checkpoint file.
> I'm not sure what the correct fix is here; it might involve a class signature change. An alternative simple fix is to leave the last checkpoint around and expect the user to clean the checkpoint directory themselves.
> {noformat}
> java.io.FileNotFoundException: File does not exist: /hdfs/path/to/checkpoints/c8bd2b4e-27dd-47b3-84ec-3ff0bac04587/rdd-635/part-00071
> {noformat}
> Relevant code is included below.
> LDAOptimizer.scala:
> {noformat}
> override private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel = {
> require(graph != null, "graph is null, EMLDAOptimizer not initialized.")
> this.graphCheckpointer.deleteAllCheckpoints()
> // The constructor's default arguments assume gammaShape = 100 to ensure equivalence in
> // LDAModel.toLocal conversion
> new DistributedLDAModel(this.graph, this.globalTopicTotals, this.k, this.vocabSize,
> Vectors.dense(Array.fill(this.k)(this.docConcentration)), this.topicConcentration,
> iterationTimes)
> }
> {noformat}
> PeriodicCheckpointer.scala
> {noformat}
> /**
> * Call this at the end to delete any remaining checkpoint files.
> */
> def deleteAllCheckpoints(): Unit = {
> while (checkpointQueue.nonEmpty) {
> removeCheckpointFile()
> }
> }
> /**
> * Dequeue the oldest checkpointed Dataset, and remove its checkpoint files.
> * This prints a warning but does not fail if the files cannot be removed.
> */
> private def removeCheckpointFile(): Unit = {
> val old = checkpointQueue.dequeue()
> // Since the old checkpoint is not deleted by Spark, we manually delete it.
> val fs = FileSystem.get(sc.hadoopConfiguration)
> getCheckpointFiles(old).foreach { checkpointFile =>
> try {
> fs.delete(new Path(checkpointFile), true)
> } catch {
> case e: Exception =>
> logWarning("PeriodicCheckpointer could not remove old checkpoint file: " +
> checkpointFile)
> }
> }
> }
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org