You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jk...@apache.org on 2015/07/23 00:07:09 UTC

spark git commit: [SPARK-8536] [MLLIB] Generalize OnlineLDAOptimizer to asymmetric document-topic Dirichlet priors

Repository: spark
Updated Branches:
  refs/heads/master cf21d05f8 -> 1aca9c13c


[SPARK-8536] [MLLIB] Generalize OnlineLDAOptimizer to asymmetric document-topic Dirichlet priors

Modify `LDA` to take asymmetric document-topic prior distributions and `OnlineLDAOptimizer` to use the asymmetric prior during variational inference.

This PR only generalizes `OnlineLDAOptimizer` and the associated `LocalLDAModel`; `EMLDAOptimizer` and `DistributedLDAModel` still only support symmetric `alpha` (checked during `EMLDAOptimizer.initialize`).

Author: Feynman Liang <fl...@databricks.com>

Closes #7575 from feynmanliang/SPARK-8536-LDA-asymmetric-priors and squashes the following commits:

af8fbb7 [Feynman Liang] Fix merge errors
ef5821d [Feynman Liang] Merge remote-tracking branch 'apache/master' into SPARK-8536-LDA-asymmetric-priors
58f1d7b [Feynman Liang] Fix from review feedback
a6dcf70 [Feynman Liang] Change docConcentration interface and move LDAOptimizer validation to initialize, add sad path tests
72038ff [Feynman Liang] Add tests referenced against gensim
d4284fa [Feynman Liang] Generalize OnlineLDA to asymmetric priors, no tests


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1aca9c13
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1aca9c13
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1aca9c13

Branch: refs/heads/master
Commit: 1aca9c13c144fa336af6afcfa666128bf77c49d4
Parents: cf21d05
Author: Feynman Liang <fl...@databricks.com>
Authored: Wed Jul 22 15:07:05 2015 -0700
Committer: Joseph K. Bradley <jo...@databricks.com>
Committed: Wed Jul 22 15:07:05 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/mllib/clustering/LDA.scala | 49 +++++++-----
 .../spark/mllib/clustering/LDAOptimizer.scala   | 27 +++++--
 .../spark/mllib/clustering/LDASuite.scala       | 82 ++++++++++++++++++--
 3 files changed, 126 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1aca9c13/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
index a410547..ab124e6 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
@@ -23,11 +23,10 @@ import org.apache.spark.Logging
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.api.java.JavaPairRDD
 import org.apache.spark.graphx._
-import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.mllib.linalg.{Vector, Vectors}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.util.Utils
 
-
 /**
  * :: Experimental ::
  *
@@ -49,14 +48,15 @@ import org.apache.spark.util.Utils
 class LDA private (
     private var k: Int,
     private var maxIterations: Int,
-    private var docConcentration: Double,
+    private var docConcentration: Vector,
     private var topicConcentration: Double,
     private var seed: Long,
     private var checkpointInterval: Int,
     private var ldaOptimizer: LDAOptimizer) extends Logging {
 
-  def this() = this(k = 10, maxIterations = 20, docConcentration = -1, topicConcentration = -1,
-    seed = Utils.random.nextLong(), checkpointInterval = 10, ldaOptimizer = new EMLDAOptimizer)
+  def this() = this(k = 10, maxIterations = 20, docConcentration = Vectors.dense(-1),
+    topicConcentration = -1, seed = Utils.random.nextLong(), checkpointInterval = 10,
+    ldaOptimizer = new EMLDAOptimizer)
 
   /**
    * Number of topics to infer.  I.e., the number of soft cluster centers.
@@ -77,37 +77,50 @@ class LDA private (
    * Concentration parameter (commonly named "alpha") for the prior placed on documents'
    * distributions over topics ("theta").
    *
-   * This is the parameter to a symmetric Dirichlet distribution.
+   * This is the parameter to a Dirichlet distribution.
    */
-  def getDocConcentration: Double = this.docConcentration
+  def getDocConcentration: Vector = this.docConcentration
 
   /**
    * Concentration parameter (commonly named "alpha") for the prior placed on documents'
    * distributions over topics ("theta").
    *
-   * This is the parameter to a symmetric Dirichlet distribution, where larger values
-   * mean more smoothing (more regularization).
+   * This is the parameter to a Dirichlet distribution, where larger values mean more smoothing
+   * (more regularization).
    *
-   * If set to -1, then docConcentration is set automatically.
-   *  (default = -1 = automatic)
+   * If set to a singleton vector Vector(-1), then docConcentration is set automatically. If set to
+   * singleton vector Vector(t) where t != -1, then t is replicated to a vector of length k during
+   * [[LDAOptimizer.initialize()]]. Otherwise, the [[docConcentration]] vector must be length k.
+   * (default = Vector(-1) = automatic)
    *
    * Optimizer-specific parameter settings:
    *  - EM
-   *     - Value should be > 1.0
-   *     - default = (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows
-   *       Asuncion et al. (2009), who recommend a +1 adjustment for EM.
+   *     - Currently only supports symmetric distributions, so all values in the vector should be
+   *       the same.
+   *     - Values should be > 1.0
+   *     - default = uniformly (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows
+   *       from Asuncion et al. (2009), who recommend a +1 adjustment for EM.
    *  - Online
-   *     - Value should be >= 0
-   *     - default = (1.0 / k), following the implementation from
+   *     - Values should be >= 0
+   *     - default = uniformly (1.0 / k), following the implementation from
    *       [[https://github.com/Blei-Lab/onlineldavb]].
    */
-  def setDocConcentration(docConcentration: Double): this.type = {
+  def setDocConcentration(docConcentration: Vector): this.type = {
     this.docConcentration = docConcentration
     this
   }
 
+  /** Replicates Double to create a symmetric prior */
+  def setDocConcentration(docConcentration: Double): this.type = {
+    this.docConcentration = Vectors.dense(docConcentration)
+    this
+  }
+
   /** Alias for [[getDocConcentration]] */
-  def getAlpha: Double = getDocConcentration
+  def getAlpha: Vector = getDocConcentration
+
+  /** Alias for [[setDocConcentration()]] */
+  def setAlpha(alpha: Vector): this.type = setDocConcentration(alpha)
 
   /** Alias for [[setDocConcentration()]] */
   def setAlpha(alpha: Double): this.type = setDocConcentration(alpha)

http://git-wip-us.apache.org/repos/asf/spark/blob/1aca9c13/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
index b960ae6..f4170a3 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
@@ -27,7 +27,7 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.graphx._
 import org.apache.spark.graphx.impl.GraphImpl
 import org.apache.spark.mllib.impl.PeriodicGraphCheckpointer
-import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseVector, Vector}
+import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseVector, Vector, Vectors}
 import org.apache.spark.rdd.RDD
 
 /**
@@ -95,8 +95,11 @@ final class EMLDAOptimizer extends LDAOptimizer {
    * Compute bipartite term/doc graph.
    */
   override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = {
+    val docConcentration = lda.getDocConcentration(0)
+    require({
+      lda.getDocConcentration.toArray.forall(_ == docConcentration)
+    }, "EMLDAOptimizer currently only supports symmetric document-topic priors")
 
-    val docConcentration = lda.getDocConcentration
     val topicConcentration = lda.getTopicConcentration
     val k = lda.getK
 
@@ -229,10 +232,10 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
   private var vocabSize: Int = 0
 
   /** alias for docConcentration */
-  private var alpha: Double = 0
+  private var alpha: Vector = Vectors.dense(0)
 
   /** (private[clustering] for debugging)  Get docConcentration */
-  private[clustering] def getAlpha: Double = alpha
+  private[clustering] def getAlpha: Vector = alpha
 
   /** alias for topicConcentration */
   private var eta: Double = 0
@@ -343,7 +346,19 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
     this.k = lda.getK
     this.corpusSize = docs.count()
     this.vocabSize = docs.first()._2.size
-    this.alpha = if (lda.getDocConcentration == -1) 1.0 / k else lda.getDocConcentration
+    this.alpha = if (lda.getDocConcentration.size == 1) {
+      if (lda.getDocConcentration(0) == -1) Vectors.dense(Array.fill(k)(1.0 / k))
+      else {
+        require(lda.getDocConcentration(0) >= 0, s"all entries in alpha must be >=0, got: $alpha")
+        Vectors.dense(Array.fill(k)(lda.getDocConcentration(0)))
+      }
+    } else {
+      require(lda.getDocConcentration.size == k, s"alpha must have length k, got: $alpha")
+      lda.getDocConcentration.foreachActive { case (_, x) =>
+        require(x >= 0, s"all entries in alpha must be >= 0, got: $alpha")
+      }
+      lda.getDocConcentration
+    }
     this.eta = if (lda.getTopicConcentration == -1) 1.0 / k else lda.getTopicConcentration
     this.randomGenerator = new Random(lda.getSeed)
 
@@ -372,7 +387,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
     val vocabSize = this.vocabSize
     val Elogbeta = dirichletExpectation(lambda).t
     val expElogbeta = exp(Elogbeta)
-    val alpha = this.alpha
+    val alpha = this.alpha.toBreeze
     val gammaShape = this.gammaShape
 
     val stats: RDD[BDM[Double]] = batch.mapPartitions { docs =>

http://git-wip-us.apache.org/repos/asf/spark/blob/1aca9c13/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
index 721a065..da70d9b 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.mllib.clustering
 import breeze.linalg.{DenseMatrix => BDM}
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.mllib.linalg.{Vector, DenseMatrix, Matrix, Vectors}
+import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix, Vector, Vectors}
 import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.mllib.util.TestingUtils._
 import org.apache.spark.util.Utils
@@ -132,22 +132,38 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext {
 
   test("setter alias") {
     val lda = new LDA().setAlpha(2.0).setBeta(3.0)
-    assert(lda.getAlpha === 2.0)
-    assert(lda.getDocConcentration === 2.0)
+    assert(lda.getAlpha.toArray.forall(_ === 2.0))
+    assert(lda.getDocConcentration.toArray.forall(_ === 2.0))
     assert(lda.getBeta === 3.0)
     assert(lda.getTopicConcentration === 3.0)
   }
 
+  test("initializing with alpha length != k or 1 fails") {
+    intercept[IllegalArgumentException] {
+      val lda = new LDA().setK(2).setAlpha(Vectors.dense(1, 2, 3, 4))
+      val corpus = sc.parallelize(tinyCorpus, 2)
+      lda.run(corpus)
+    }
+  }
+
+  test("initializing with elements in alpha < 0 fails") {
+    intercept[IllegalArgumentException] {
+      val lda = new LDA().setK(4).setAlpha(Vectors.dense(-1, 2, 3, 4))
+      val corpus = sc.parallelize(tinyCorpus, 2)
+      lda.run(corpus)
+    }
+  }
+
   test("OnlineLDAOptimizer initialization") {
     val lda = new LDA().setK(2)
     val corpus = sc.parallelize(tinyCorpus, 2)
     val op = new OnlineLDAOptimizer().initialize(corpus, lda)
     op.setKappa(0.9876).setMiniBatchFraction(0.123).setTau0(567)
-    assert(op.getAlpha == 0.5) // default 1.0 / k
-    assert(op.getEta == 0.5)   // default 1.0 / k
-    assert(op.getKappa == 0.9876)
-    assert(op.getMiniBatchFraction == 0.123)
-    assert(op.getTau0 == 567)
+    assert(op.getAlpha.toArray.forall(_ === 0.5)) // default 1.0 / k
+    assert(op.getEta === 0.5)   // default 1.0 / k
+    assert(op.getKappa === 0.9876)
+    assert(op.getMiniBatchFraction === 0.123)
+    assert(op.getTau0 === 567)
   }
 
   test("OnlineLDAOptimizer one iteration") {
@@ -218,6 +234,56 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext {
     }
   }
 
+  test("OnlineLDAOptimizer with asymmetric prior") {
+    def toydata: Array[(Long, Vector)] = Array(
+      Vectors.sparse(6, Array(0, 1), Array(1, 1)),
+      Vectors.sparse(6, Array(1, 2), Array(1, 1)),
+      Vectors.sparse(6, Array(0, 2), Array(1, 1)),
+      Vectors.sparse(6, Array(3, 4), Array(1, 1)),
+      Vectors.sparse(6, Array(3, 5), Array(1, 1)),
+      Vectors.sparse(6, Array(4, 5), Array(1, 1))
+    ).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) }
+
+    val docs = sc.parallelize(toydata)
+    val op = new OnlineLDAOptimizer().setMiniBatchFraction(1).setTau0(1024).setKappa(0.51)
+      .setGammaShape(1e10)
+    val lda = new LDA().setK(2)
+      .setDocConcentration(Vectors.dense(0.00001, 0.1))
+      .setTopicConcentration(0.01)
+      .setMaxIterations(100)
+      .setOptimizer(op)
+      .setSeed(12345)
+
+    val ldaModel = lda.run(docs)
+    val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10)
+    val topics = topicIndices.map { case (terms, termWeights) =>
+      terms.zip(termWeights)
+    }
+
+    /* Verify results with Python:
+
+       import numpy as np
+       from gensim import models
+       corpus = [
+           [(0, 1.0), (1, 1.0)],
+           [(1, 1.0), (2, 1.0)],
+           [(0, 1.0), (2, 1.0)],
+           [(3, 1.0), (4, 1.0)],
+           [(3, 1.0), (5, 1.0)],
+           [(4, 1.0), (5, 1.0)]]
+       np.random.seed(10)
+       lda = models.ldamodel.LdaModel(
+           corpus=corpus, alpha=np.array([0.00001, 0.1]), num_topics=2, update_every=0, passes=100)
+       lda.print_topics()
+
+       > ['0.167*0 + 0.167*1 + 0.167*2 + 0.167*3 + 0.167*4 + 0.167*5',
+          '0.167*0 + 0.167*1 + 0.167*2 + 0.167*4 + 0.167*3 + 0.167*5']
+     */
+    topics.foreach { topic =>
+      assert(topic.forall { case (_, p) => p ~= 0.167 absTol 0.05 })
+    }
+  }
+
   test("model save/load") {
     // Test for LocalLDAModel.
     val localModel = new LocalLDAModel(tinyTopics)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org