You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dong Wang (Jira)" <ji...@apache.org> on 2019/11/09 14:59:00 UTC

[jira] [Created] (SPARK-29817) Missing persist on docs in mllib.clustering.LDAOptimizer.initialize

Dong Wang created SPARK-29817:
---------------------------------

             Summary: Missing persist on docs in mllib.clustering.LDAOptimizer.initialize
                 Key: SPARK-29817
                 URL: https://issues.apache.org/jira/browse/SPARK-29817
             Project: Spark
          Issue Type: Improvement
          Components: MLlib
    Affects Versions: 2.4.3
            Reporter: Dong Wang


The rdd docs in mllib.clustering.LDAOptimizer is used in two actions: verticesTMP.reduceByKey, and docs.take(1). It should be persisted.
{code:scala}
  override private[clustering] def initialize(
      docs: RDD[(Long, Vector)],
      lda: LDA): EMLDAOptimizer = {
      ...
    val edges: RDD[Edge[TokenCount]] = docs.flatMap { case (docID: Long, termCounts: Vector) =>
      // Add edges for terms with non-zero counts.
      termCounts.asBreeze.activeIterator.filter(_._2 != 0.0).map { case (term, cnt) =>
        Edge(docID, term2index(term), cnt)
      }
    }
    // Create vertices.
    // Initially, we use random soft assignments of tokens to topics (random gamma).
    val docTermVertices: RDD[(VertexId, TopicCounts)] = {
      val verticesTMP: RDD[(VertexId, TopicCounts)] =
        edges.mapPartitionsWithIndex { case (partIndex, partEdges) =>
          val random = new Random(partIndex + randomSeed)
          partEdges.flatMap { edge =>
            val gamma = normalize(BDV.fill[Double](k)(random.nextDouble()), 1.0)
            val sum = gamma * edge.attr
            Seq((edge.srcId, sum), (edge.dstId, sum))
          }
        }
      verticesTMP.reduceByKey(_ + _) // RDD dependency: verticesTMP - edges - docs. First use docs
    }
    // Partition such that edges are grouped by document
    this.graph = Graph(docTermVertices, edges).partitionBy(PartitionStrategy.EdgePartition1D)
    this.k = k
    this.vocabSize = docs.take(1).head._2.size // Second use docs
{code}

This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org