You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jk...@apache.org on 2015/07/30 22:17:58 UTC

spark git commit: [SPARK-5567] [MLLIB] Add predict method to LocalLDAModel

Repository: spark
Updated Branches:
  refs/heads/master a20e743fb -> d8cfd531c


[SPARK-5567] [MLLIB] Add predict method to LocalLDAModel

jkbradley hhbyyh

Adds `topicDistributions` to LocalLDAModel. Please review after #7757 is merged.

Author: Feynman Liang <fl...@databricks.com>

Closes #7760 from feynmanliang/SPARK-5567-predict-in-LDA and squashes the following commits:

0ad1134 [Feynman Liang] Remove println
27b3877 [Feynman Liang] Code review fixes
6bfb87c [Feynman Liang] Remove extra newline
476f788 [Feynman Liang] Fix checks and doc for variationalInference
061780c [Feynman Liang] Code review cleanup
3be2947 [Feynman Liang] Rename topicDistribution -> topicDistributions
2a821a6 [Feynman Liang] Add predict methods to LocalLDAModel


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8cfd531
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8cfd531
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8cfd531

Branch: refs/heads/master
Commit: d8cfd531c7c50c9b00ab546be458f44f84c386ae
Parents: a20e743
Author: Feynman Liang <fl...@databricks.com>
Authored: Thu Jul 30 13:17:54 2015 -0700
Committer: Joseph K. Bradley <jo...@databricks.com>
Committed: Thu Jul 30 13:17:54 2015 -0700

----------------------------------------------------------------------
 .../spark/mllib/clustering/LDAModel.scala       | 42 +++++++++++--
 .../spark/mllib/clustering/LDAOptimizer.scala   |  5 +-
 .../spark/mllib/clustering/LDASuite.scala       | 63 ++++++++++++++++++++
 3 files changed, 102 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d8cfd531/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
index ece2884..6cfad3f 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
@@ -186,7 +186,6 @@ abstract class LDAModel private[clustering] extends Saveable {
  * This model stores only the inferred topics.
  * It may be used for computing topics for new documents, but it may give less accurate answers
  * than the [[DistributedLDAModel]].
- *
  * @param topics Inferred topics (vocabSize x k matrix).
  */
 @Experimental
@@ -221,9 +220,6 @@ class LocalLDAModel private[clustering] (
   // TODO
   // override def logLikelihood(documents: RDD[(Long, Vector)]): Double = ???
 
-  // TODO:
-  // override def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = ???
-
   /**
    * Calculate the log variational bound on perplexity. See Equation (16) in original Online
    * LDA paper.
@@ -269,7 +265,7 @@ class LocalLDAModel private[clustering] (
     // by topic (columns of lambda)
     val Elogbeta = LDAUtils.dirichletExpectation(lambda.t).t
 
-    var score = documents.filter(_._2.numActives > 0).map { case (id: Long, termCounts: Vector) =>
+    var score = documents.filter(_._2.numNonzeros > 0).map { case (id: Long, termCounts: Vector) =>
       var docScore = 0.0D
       val (gammad: BDV[Double], _) = OnlineLDAOptimizer.variationalTopicInference(
         termCounts, exp(Elogbeta), brzAlpha, gammaShape, k)
@@ -277,7 +273,7 @@ class LocalLDAModel private[clustering] (
 
       // E[log p(doc | theta, beta)]
       termCounts.foreachActive { case (idx, count) =>
-        docScore += LDAUtils.logSumExp(Elogthetad + Elogbeta(idx, ::).t)
+        docScore += count * LDAUtils.logSumExp(Elogthetad + Elogbeta(idx, ::).t)
       }
       // E[log p(theta | alpha) - log q(theta | gamma)]; assumes alpha is a vector
       docScore += sum((brzAlpha - gammad) :* Elogthetad)
@@ -297,6 +293,40 @@ class LocalLDAModel private[clustering] (
     score
   }
 
+  /**
+   * Predicts the topic mixture distribution for each document (often called "theta" in the
+   * literature).  Returns a vector of zeros for an empty document.
+   *
+   * This uses a variational approximation following Hoffman et al. (2010), where the approximate
+   * distribution is called "gamma."  Technically, this method returns this approximation "gamma"
+   * for each document.
+   * @param documents documents to predict topic mixture distributions for
+   * @return An RDD of (document ID, topic mixture distribution for document)
+   */
+  // TODO: declare in LDAModel and override once implemented in DistributedLDAModel
+  def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = {
+    // Double transpose because dirichletExpectation normalizes by row and we need to normalize
+    // by topic (columns of lambda)
+    val expElogbeta = exp(LDAUtils.dirichletExpectation(topicsMatrix.toBreeze.toDenseMatrix.t).t)
+    val docConcentrationBrz = this.docConcentration.toBreeze
+    val gammaShape = this.gammaShape
+    val k = this.k
+
+    documents.map { case (id: Long, termCounts: Vector) =>
+      if (termCounts.numNonzeros == 0) {
+         (id, Vectors.zeros(k))
+      } else {
+        val (gamma, _) = OnlineLDAOptimizer.variationalTopicInference(
+          termCounts,
+          expElogbeta,
+          docConcentrationBrz,
+          gammaShape,
+          k)
+        (id, Vectors.dense(normalize(gamma, 1.0).toArray))
+      }
+    }
+  }
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d8cfd531/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
index 4b90fbd..9dbec41 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
@@ -394,7 +394,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
     val gammaShape = this.gammaShape
 
     val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
-      val nonEmptyDocs = docs.filter(_._2.numActives > 0)
+      val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
 
       val stat = BDM.zeros[Double](k, vocabSize)
       var gammaPart = List[BDV[Double]]()
@@ -461,7 +461,8 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
 private[clustering] object OnlineLDAOptimizer {
   /**
    * Uses variational inference to infer the topic distribution `gammad` given the term counts
-   * for a document. `termCounts` must be non-empty, otherwise Breeze will throw a BLAS error.
+   * for a document. `termCounts` must contain at least one non-zero entry, otherwise Breeze will
+   * throw a BLAS error.
    *
    * An optimization (Lee, Seung: Algorithms for non-negative matrix factorization, NIPS 2001)
    * avoids explicit computation of variational parameter `phi`.

http://git-wip-us.apache.org/repos/asf/spark/blob/d8cfd531/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
index 61d2edf..d74482d 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
@@ -242,6 +242,7 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext {
     val alpha = 0.01
     val eta = 0.01
     val gammaShape = 100
+    // obtained from LDA model trained in gensim, see below
     val topics = new DenseMatrix(numRows = vocabSize, numCols = k, values = Array(
       1.86738052, 1.94056535, 1.89981687, 0.0833265, 0.07405918, 0.07940597,
       0.15081551, 0.08637973, 0.12428538, 1.9474897, 1.94615165, 1.95204124))
@@ -281,6 +282,68 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext {
     assert(ldaModel.logPerplexity(docs) ~== -3.690D relTol 1E-3D)
   }
 
+  test("LocalLDAModel predict") {
+    val k = 2
+    val vocabSize = 6
+    val alpha = 0.01
+    val eta = 0.01
+    val gammaShape = 100
+    // obtained from LDA model trained in gensim, see below
+    val topics = new DenseMatrix(numRows = vocabSize, numCols = k, values = Array(
+      1.86738052, 1.94056535, 1.89981687, 0.0833265, 0.07405918, 0.07940597,
+      0.15081551, 0.08637973, 0.12428538, 1.9474897, 1.94615165, 1.95204124))
+
+    def toydata: Array[(Long, Vector)] = Array(
+      Vectors.sparse(6, Array(0, 1), Array(1, 1)),
+      Vectors.sparse(6, Array(1, 2), Array(1, 1)),
+      Vectors.sparse(6, Array(0, 2), Array(1, 1)),
+      Vectors.sparse(6, Array(3, 4), Array(1, 1)),
+      Vectors.sparse(6, Array(3, 5), Array(1, 1)),
+      Vectors.sparse(6, Array(4, 5), Array(1, 1))
+    ).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) }
+    val docs = sc.parallelize(toydata)
+
+    val ldaModel: LocalLDAModel = new LocalLDAModel(
+      topics, Vectors.dense(Array.fill(k)(alpha)), eta, gammaShape)
+
+    /* Verify results using gensim:
+       import numpy as np
+       from gensim import models
+       corpus = [
+          [(0, 1.0), (1, 1.0)],
+          [(1, 1.0), (2, 1.0)],
+          [(0, 1.0), (2, 1.0)],
+          [(3, 1.0), (4, 1.0)],
+          [(3, 1.0), (5, 1.0)],
+          [(4, 1.0), (5, 1.0)]]
+       np.random.seed(2345)
+       lda = models.ldamodel.LdaModel(
+          corpus=corpus, alpha=0.01, eta=0.01, num_topics=2, update_every=0, passes=100,
+          decay=0.51, offset=1024)
+       print(list(lda.get_document_topics(corpus)))
+       > [[(0, 0.99504950495049516)], [(0, 0.99504950495049516)],
+       > [(0, 0.99504950495049516)], [(1, 0.99504950495049516)],
+       > [(1, 0.99504950495049516)], [(1, 0.99504950495049516)]]
+     */
+
+    val expectedPredictions = List(
+      (0, 0.99504), (0, 0.99504),
+      (0, 0.99504), (1, 0.99504),
+      (1, 0.99504), (1, 0.99504))
+
+    val actualPredictions = ldaModel.topicDistributions(docs).map { case (id, topics) =>
+        // convert results to expectedPredictions format, which only has highest probability topic
+        val topicsBz = topics.toBreeze.toDenseVector
+        (id, (argmax(topicsBz), max(topicsBz)))
+      }.sortByKey()
+      .values
+      .collect()
+
+    expectedPredictions.zip(actualPredictions).forall { case (expected, actual) =>
+      expected._1 === actual._1 && (expected._2 ~== actual._2 relTol 1E-3D)
+    }
+  }
+
   test("OnlineLDAOptimizer with asymmetric prior") {
     def toydata: Array[(Long, Vector)] = Array(
       Vectors.sparse(6, Array(0, 1), Array(1, 1)),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org