You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean R. Owen (Jira)" <ji...@apache.org> on 2019/11/10 19:20:00 UTC

[jira] [Resolved] (SPARK-29817) Missing persist on docs in mllib.clustering.LDAOptimizer.initialize

     [ https://issues.apache.org/jira/browse/SPARK-29817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean R. Owen resolved SPARK-29817.
----------------------------------
    Resolution: Duplicate

We should not be making 10+ JIRAs here. The changes in question are small and closely related and probably need to be considered together, or else we may resolve them differently by different people at different times. I'm going to close them in favor of the parent. We can consider breaking out some of them later.

> Missing persist on docs in mllib.clustering.LDAOptimizer.initialize
> -------------------------------------------------------------------
>
>                 Key: SPARK-29817
>                 URL: https://issues.apache.org/jira/browse/SPARK-29817
>             Project: Spark
>          Issue Type: Sub-task
>          Components: MLlib
>    Affects Versions: 2.4.3
>            Reporter: Dong Wang
>            Priority: Major
>
> The rdd docs in mllib.clustering.LDAOptimizer is used in two actions: verticesTMP.reduceByKey, and docs.take(1). It should be persisted.
> {code:scala}
>   override private[clustering] def initialize(
>       docs: RDD[(Long, Vector)],
>       lda: LDA): EMLDAOptimizer = {
>       ...
>     val edges: RDD[Edge[TokenCount]] = docs.flatMap { case (docID: Long, termCounts: Vector) =>
>       // Add edges for terms with non-zero counts.
>       termCounts.asBreeze.activeIterator.filter(_._2 != 0.0).map { case (term, cnt) =>
>         Edge(docID, term2index(term), cnt)
>       }
>     }
>     // Create vertices.
>     // Initially, we use random soft assignments of tokens to topics (random gamma).
>     val docTermVertices: RDD[(VertexId, TopicCounts)] = {
>       val verticesTMP: RDD[(VertexId, TopicCounts)] =
>         edges.mapPartitionsWithIndex { case (partIndex, partEdges) =>
>           val random = new Random(partIndex + randomSeed)
>           partEdges.flatMap { edge =>
>             val gamma = normalize(BDV.fill[Double](k)(random.nextDouble()), 1.0)
>             val sum = gamma * edge.attr
>             Seq((edge.srcId, sum), (edge.dstId, sum))
>           }
>         }
>       verticesTMP.reduceByKey(_ + _) // RDD dependency: verticesTMP - edges - docs. First use docs
>     }
>     // Partition such that edges are grouped by document
>     this.graph = Graph(docTermVertices, edges).partitionBy(PartitionStrategy.EdgePartition1D)
>     this.k = k
>     this.vocabSize = docs.take(1).head._2.size // Second use docs
> {code}
> This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org