You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Joseph K. Bradley (JIRA)" <ji...@apache.org> on 2016/03/04 18:49:41 UTC

[jira] [Commented] (SPARK-13048) EMLDAOptimizer deletes dependent checkpoint of DistributedLDAModel

    [ https://issues.apache.org/jira/browse/SPARK-13048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15180241#comment-15180241 ] 

Joseph K. Bradley commented on SPARK-13048:
-------------------------------------------

I'd say the best fix would be to add an option to LDA to not delete the last checkpoint.  I'd prefer to expose this as a Param in the spark.ml API, but it could be added to the spark.mllib API as well if necessary.

[~holdenk]  I agree we need to figure out how to handle/control caching and checkpointing within Pipelines, but that will have to wait for after 2.0.

[~jvstein]  We try to minimize the public API.  Although I agree with you about opening up APIs in principal, it have proven dangerous in practice.  Even when we mark things DeveloperApi, many users still use those APIs, making it difficult to change them in the future.

> EMLDAOptimizer deletes dependent checkpoint of DistributedLDAModel
> ------------------------------------------------------------------
>
>                 Key: SPARK-13048
>                 URL: https://issues.apache.org/jira/browse/SPARK-13048
>             Project: Spark
>          Issue Type: Bug
>          Components: MLlib
>    Affects Versions: 1.5.2
>         Environment: Standalone Spark cluster
>            Reporter: Jeff Stein
>
> In EMLDAOptimizer, all checkpoints are deleted before returning the DistributedLDAModel.
> The most recent checkpoint is still necessary for operations on the DistributedLDAModel under a couple scenarios:
> - The graph doesn't fit in memory on the worker nodes (e.g. very large data set).
> - Late worker failures that require reading the now-dependent checkpoint.
> I ran into this problem running a 10M record LDA model in a memory starved environment. The model consistently failed in either the {{collect at LDAModel.scala:528}} stage (when converting to a LocalLDAModel) or in the {{reduce at LDAModel.scala:563}} stage (when calling "describeTopics" on the model). In both cases, a FileNotFoundException is thrown attempting to access a checkpoint file.
> I'm not sure what the correct fix is here; it might involve a class signature change. An alternative simple fix is to leave the last checkpoint around and expect the user to clean the checkpoint directory themselves.
> {noformat}
> java.io.FileNotFoundException: File does not exist: /hdfs/path/to/checkpoints/c8bd2b4e-27dd-47b3-84ec-3ff0bac04587/rdd-635/part-00071
> {noformat}
> Relevant code is included below.
> LDAOptimizer.scala:
> {noformat}
>   override private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel = {
>     require(graph != null, "graph is null, EMLDAOptimizer not initialized.")
>     this.graphCheckpointer.deleteAllCheckpoints()
>     // The constructor's default arguments assume gammaShape = 100 to ensure equivalence in
>     // LDAModel.toLocal conversion
>     new DistributedLDAModel(this.graph, this.globalTopicTotals, this.k, this.vocabSize,
>       Vectors.dense(Array.fill(this.k)(this.docConcentration)), this.topicConcentration,
>       iterationTimes)
>   }
> {noformat}
> PeriodicCheckpointer.scala
> {noformat}
>   /**
>    * Call this at the end to delete any remaining checkpoint files.
>    */
>   def deleteAllCheckpoints(): Unit = {
>     while (checkpointQueue.nonEmpty) {
>       removeCheckpointFile()
>     }
>   }
>   /**
>    * Dequeue the oldest checkpointed Dataset, and remove its checkpoint files.
>    * This prints a warning but does not fail if the files cannot be removed.
>    */
>   private def removeCheckpointFile(): Unit = {
>     val old = checkpointQueue.dequeue()
>     // Since the old checkpoint is not deleted by Spark, we manually delete it.
>     val fs = FileSystem.get(sc.hadoopConfiguration)
>     getCheckpointFiles(old).foreach { checkpointFile =>
>       try {
>         fs.delete(new Path(checkpointFile), true)
>       } catch {
>         case e: Exception =>
>           logWarning("PeriodicCheckpointer could not remove old checkpoint file: " +
>             checkpointFile)
>       }
>     }
>   }
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org