You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Aman Omer (Jira)" <ji...@apache.org> on 2019/11/09 19:08:00 UTC

[jira] [Updated] (SPARK-29817) Missing persist on docs in mllib.clustering.LDAOptimizer.initialize

     [ https://issues.apache.org/jira/browse/SPARK-29817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Aman Omer updated SPARK-29817:
------------------------------
        Parent: SPARK-29818
    Issue Type: Sub-task  (was: Improvement)

> Missing persist on docs in mllib.clustering.LDAOptimizer.initialize
> -------------------------------------------------------------------
>
>                 Key: SPARK-29817
>                 URL: https://issues.apache.org/jira/browse/SPARK-29817
>             Project: Spark
>          Issue Type: Sub-task
>          Components: MLlib
>    Affects Versions: 2.4.3
>            Reporter: Dong Wang
>            Priority: Major
>
> The rdd docs in mllib.clustering.LDAOptimizer is used in two actions: verticesTMP.reduceByKey, and docs.take(1). It should be persisted.
> {code:scala}
>   override private[clustering] def initialize(
>       docs: RDD[(Long, Vector)],
>       lda: LDA): EMLDAOptimizer = {
>       ...
>     val edges: RDD[Edge[TokenCount]] = docs.flatMap { case (docID: Long, termCounts: Vector) =>
>       // Add edges for terms with non-zero counts.
>       termCounts.asBreeze.activeIterator.filter(_._2 != 0.0).map { case (term, cnt) =>
>         Edge(docID, term2index(term), cnt)
>       }
>     }
>     // Create vertices.
>     // Initially, we use random soft assignments of tokens to topics (random gamma).
>     val docTermVertices: RDD[(VertexId, TopicCounts)] = {
>       val verticesTMP: RDD[(VertexId, TopicCounts)] =
>         edges.mapPartitionsWithIndex { case (partIndex, partEdges) =>
>           val random = new Random(partIndex + randomSeed)
>           partEdges.flatMap { edge =>
>             val gamma = normalize(BDV.fill[Double](k)(random.nextDouble()), 1.0)
>             val sum = gamma * edge.attr
>             Seq((edge.srcId, sum), (edge.dstId, sum))
>           }
>         }
>       verticesTMP.reduceByKey(_ + _) // RDD dependency: verticesTMP - edges - docs. First use docs
>     }
>     // Partition such that edges are grouped by document
>     this.graph = Graph(docTermVertices, edges).partitionBy(PartitionStrategy.EdgePartition1D)
>     this.k = k
>     this.vocabSize = docs.take(1).head._2.size // Second use docs
> {code}
> This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org