You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Aman Omer (Jira)" <ji...@apache.org> on 2019/11/09 19:08:00 UTC
[jira] [Updated] (SPARK-29817) Missing persist on docs in
mllib.clustering.LDAOptimizer.initialize
[ https://issues.apache.org/jira/browse/SPARK-29817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Aman Omer updated SPARK-29817:
------------------------------
Parent: SPARK-29818
Issue Type: Sub-task (was: Improvement)
> Missing persist on docs in mllib.clustering.LDAOptimizer.initialize
> -------------------------------------------------------------------
>
> Key: SPARK-29817
> URL: https://issues.apache.org/jira/browse/SPARK-29817
> Project: Spark
> Issue Type: Sub-task
> Components: MLlib
> Affects Versions: 2.4.3
> Reporter: Dong Wang
> Priority: Major
>
> The rdd docs in mllib.clustering.LDAOptimizer is used in two actions: verticesTMP.reduceByKey, and docs.take(1). It should be persisted.
> {code:scala}
> override private[clustering] def initialize(
> docs: RDD[(Long, Vector)],
> lda: LDA): EMLDAOptimizer = {
> ...
> val edges: RDD[Edge[TokenCount]] = docs.flatMap { case (docID: Long, termCounts: Vector) =>
> // Add edges for terms with non-zero counts.
> termCounts.asBreeze.activeIterator.filter(_._2 != 0.0).map { case (term, cnt) =>
> Edge(docID, term2index(term), cnt)
> }
> }
> // Create vertices.
> // Initially, we use random soft assignments of tokens to topics (random gamma).
> val docTermVertices: RDD[(VertexId, TopicCounts)] = {
> val verticesTMP: RDD[(VertexId, TopicCounts)] =
> edges.mapPartitionsWithIndex { case (partIndex, partEdges) =>
> val random = new Random(partIndex + randomSeed)
> partEdges.flatMap { edge =>
> val gamma = normalize(BDV.fill[Double](k)(random.nextDouble()), 1.0)
> val sum = gamma * edge.attr
> Seq((edge.srcId, sum), (edge.dstId, sum))
> }
> }
> verticesTMP.reduceByKey(_ + _) // RDD dependency: verticesTMP - edges - docs. First use docs
> }
> // Partition such that edges are grouped by document
> this.graph = Graph(docTermVertices, edges).partitionBy(PartitionStrategy.EdgePartition1D)
> this.k = k
> this.vocabSize = docs.take(1).head._2.size // Second use docs
> {code}
> This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org