You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Joseph K. Bradley (JIRA)" <ji...@apache.org> on 2016/04/05 02:10:25 UTC

[jira] [Assigned] (SPARK-13048) EMLDAOptimizer deletes dependent checkpoint of DistributedLDAModel

     [ https://issues.apache.org/jira/browse/SPARK-13048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Joseph K. Bradley reassigned SPARK-13048:
-----------------------------------------

    Assignee: Joseph K. Bradley

> EMLDAOptimizer deletes dependent checkpoint of DistributedLDAModel
> ------------------------------------------------------------------
>
>                 Key: SPARK-13048
>                 URL: https://issues.apache.org/jira/browse/SPARK-13048
>             Project: Spark
>          Issue Type: Bug
>          Components: MLlib
>    Affects Versions: 1.5.2
>         Environment: Standalone Spark cluster
>            Reporter: Jeff Stein
>            Assignee: Joseph K. Bradley
>
> In EMLDAOptimizer, all checkpoints are deleted before returning the DistributedLDAModel.
> The most recent checkpoint is still necessary for operations on the DistributedLDAModel under a couple scenarios:
> - The graph doesn't fit in memory on the worker nodes (e.g. very large data set).
> - Late worker failures that require reading the now-dependent checkpoint.
> I ran into this problem running a 10M record LDA model in a memory starved environment. The model consistently failed in either the {{collect at LDAModel.scala:528}} stage (when converting to a LocalLDAModel) or in the {{reduce at LDAModel.scala:563}} stage (when calling "describeTopics" on the model). In both cases, a FileNotFoundException is thrown attempting to access a checkpoint file.
> I'm not sure what the correct fix is here; it might involve a class signature change. An alternative simple fix is to leave the last checkpoint around and expect the user to clean the checkpoint directory themselves.
> {noformat}
> java.io.FileNotFoundException: File does not exist: /hdfs/path/to/checkpoints/c8bd2b4e-27dd-47b3-84ec-3ff0bac04587/rdd-635/part-00071
> {noformat}
> Relevant code is included below.
> LDAOptimizer.scala:
> {noformat}
>   override private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel = {
>     require(graph != null, "graph is null, EMLDAOptimizer not initialized.")
>     this.graphCheckpointer.deleteAllCheckpoints()
>     // The constructor's default arguments assume gammaShape = 100 to ensure equivalence in
>     // LDAModel.toLocal conversion
>     new DistributedLDAModel(this.graph, this.globalTopicTotals, this.k, this.vocabSize,
>       Vectors.dense(Array.fill(this.k)(this.docConcentration)), this.topicConcentration,
>       iterationTimes)
>   }
> {noformat}
> PeriodicCheckpointer.scala
> {noformat}
>   /**
>    * Call this at the end to delete any remaining checkpoint files.
>    */
>   def deleteAllCheckpoints(): Unit = {
>     while (checkpointQueue.nonEmpty) {
>       removeCheckpointFile()
>     }
>   }
>   /**
>    * Dequeue the oldest checkpointed Dataset, and remove its checkpoint files.
>    * This prints a warning but does not fail if the files cannot be removed.
>    */
>   private def removeCheckpointFile(): Unit = {
>     val old = checkpointQueue.dequeue()
>     // Since the old checkpoint is not deleted by Spark, we manually delete it.
>     val fs = FileSystem.get(sc.hadoopConfiguration)
>     getCheckpointFiles(old).foreach { checkpointFile =>
>       try {
>         fs.delete(new Path(checkpointFile), true)
>       } catch {
>         case e: Exception =>
>           logWarning("PeriodicCheckpointer could not remove old checkpoint file: " +
>             checkpointFile)
>       }
>     }
>   }
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org