You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jk...@apache.org on 2018/05/17 00:54:11 UTC

spark git commit: [SPARK-22210][ML] Add seed for LDA variationalTopicInference

Repository: spark
Updated Branches:
  refs/heads/master 991726f31 -> bfd75cdfb


[SPARK-22210][ML] Add seed for LDA variationalTopicInference

## What changes were proposed in this pull request?

- Add seed parameter for variationalTopicInference

- Add seed for calling variationalTopicInference in submitMiniBatch

- Add var seed in LDAModel so that it can take the seed from LDA and use it for the function call of variationalTopicInference in logLikelihoodBound, topicDistributions, getTopicDistributionMethod, and topicDistribution.

## How was this patch tested?

Check the test result in mllib.clustering.LDASuite to make sure the result is repeatable with the seed.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Lu WANG <lu...@databricks.com>

Closes #21183 from ludatabricks/SPARK-22210.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bfd75cdf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bfd75cdf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bfd75cdf

Branch: refs/heads/master
Commit: bfd75cdfb22a8c2fb005da597621e1ccd3990e82
Parents: 991726f
Author: Lu WANG <lu...@databricks.com>
Authored: Wed May 16 17:54:06 2018 -0700
Committer: Joseph K. Bradley <jo...@databricks.com>
Committed: Wed May 16 17:54:06 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/ml/clustering/LDA.scala    |  6 ++-
 .../spark/mllib/clustering/LDAModel.scala       | 34 +++++++++++++---
 .../spark/mllib/clustering/LDAOptimizer.scala   | 42 +++++++++++---------
 .../apache/spark/ml/clustering/LDASuite.scala   |  6 +++
 4 files changed, 64 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bfd75cdf/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
index afe599c..fed42c9 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
@@ -569,10 +569,14 @@ abstract class LDAModel private[ml] (
 class LocalLDAModel private[ml] (
     uid: String,
     vocabSize: Int,
-    @Since("1.6.0") override private[clustering] val oldLocalModel: OldLocalLDAModel,
+    private[clustering] val oldLocalModel_ : OldLocalLDAModel,
     sparkSession: SparkSession)
   extends LDAModel(uid, vocabSize, sparkSession) {
 
+  override private[clustering] def oldLocalModel: OldLocalLDAModel = {
+    oldLocalModel_.setSeed(getSeed)
+  }
+
   @Since("1.6.0")
   override def copy(extra: ParamMap): LocalLDAModel = {
     val copied = new LocalLDAModel(uid, vocabSize, oldLocalModel, sparkSession)

http://git-wip-us.apache.org/repos/asf/spark/blob/bfd75cdf/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
index b8a6e94..f915062 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
@@ -32,7 +32,7 @@ import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector, Vectors}
 import org.apache.spark.mllib.util.{Loader, Saveable}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.util.BoundedPriorityQueue
+import org.apache.spark.util.{BoundedPriorityQueue, Utils}
 
 /**
  * Latent Dirichlet Allocation (LDA) model.
@@ -194,6 +194,8 @@ class LocalLDAModel private[spark] (
     override protected[spark] val gammaShape: Double = 100)
   extends LDAModel with Serializable {
 
+  private var seed: Long = Utils.random.nextLong()
+
   @Since("1.3.0")
   override def k: Int = topics.numCols
 
@@ -216,6 +218,21 @@ class LocalLDAModel private[spark] (
 
   override protected def formatVersion = "1.0"
 
+  /**
+   * Random seed for cluster initialization.
+   */
+  @Since("2.4.0")
+  def getSeed: Long = seed
+
+  /**
+   * Set the random seed for cluster initialization.
+   */
+  @Since("2.4.0")
+  def setSeed(seed: Long): this.type = {
+    this.seed = seed
+    this
+  }
+
   @Since("1.5.0")
   override def save(sc: SparkContext, path: String): Unit = {
     LocalLDAModel.SaveLoadV1_0.save(sc, path, topicsMatrix, docConcentration, topicConcentration,
@@ -298,6 +315,7 @@ class LocalLDAModel private[spark] (
     // by topic (columns of lambda)
     val Elogbeta = LDAUtils.dirichletExpectation(lambda.t).t
     val ElogbetaBc = documents.sparkContext.broadcast(Elogbeta)
+    val gammaSeed = this.seed
 
     // Sum bound components for each document:
     //  component for prob(tokens) + component for prob(document-topic distribution)
@@ -306,7 +324,7 @@ class LocalLDAModel private[spark] (
         val localElogbeta = ElogbetaBc.value
         var docBound = 0.0D
         val (gammad: BDV[Double], _, _) = OnlineLDAOptimizer.variationalTopicInference(
-          termCounts, exp(localElogbeta), brzAlpha, gammaShape, k)
+          termCounts, exp(localElogbeta), brzAlpha, gammaShape, k, gammaSeed + id)
         val Elogthetad: BDV[Double] = LDAUtils.dirichletExpectation(gammad)
 
         // E[log p(doc | theta, beta)]
@@ -352,6 +370,7 @@ class LocalLDAModel private[spark] (
     val docConcentrationBrz = this.docConcentration.asBreeze
     val gammaShape = this.gammaShape
     val k = this.k
+    val gammaSeed = this.seed
 
     documents.map { case (id: Long, termCounts: Vector) =>
       if (termCounts.numNonzeros == 0) {
@@ -362,7 +381,8 @@ class LocalLDAModel private[spark] (
           expElogbetaBc.value,
           docConcentrationBrz,
           gammaShape,
-          k)
+          k,
+          gammaSeed + id)
         (id, Vectors.dense(normalize(gamma, 1.0).toArray))
       }
     }
@@ -376,6 +396,7 @@ class LocalLDAModel private[spark] (
     val docConcentrationBrz = this.docConcentration.asBreeze
     val gammaShape = this.gammaShape
     val k = this.k
+    val gammaSeed = this.seed
 
     (termCounts: Vector) =>
       if (termCounts.numNonzeros == 0) {
@@ -386,7 +407,8 @@ class LocalLDAModel private[spark] (
           expElogbeta,
           docConcentrationBrz,
           gammaShape,
-          k)
+          k,
+          gammaSeed)
         Vectors.dense(normalize(gamma, 1.0).toArray)
       }
   }
@@ -403,6 +425,7 @@ class LocalLDAModel private[spark] (
    */
   @Since("2.0.0")
   def topicDistribution(document: Vector): Vector = {
+    val gammaSeed = this.seed
     val expElogbeta = exp(LDAUtils.dirichletExpectation(topicsMatrix.asBreeze.toDenseMatrix.t).t)
     if (document.numNonzeros == 0) {
       Vectors.zeros(this.k)
@@ -412,7 +435,8 @@ class LocalLDAModel private[spark] (
         expElogbeta,
         this.docConcentration.asBreeze,
         gammaShape,
-        this.k)
+        this.k,
+        gammaSeed)
       Vectors.dense(normalize(gamma, 1.0).toArray)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/bfd75cdf/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
index 693a2a3..f8e5f3e 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
@@ -30,6 +30,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseVector, Vector, Vectors}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.Utils
 
 /**
  * :: DeveloperApi ::
@@ -464,6 +465,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer with Logging {
     val alpha = this.alpha.asBreeze
     val gammaShape = this.gammaShape
     val optimizeDocConcentration = this.optimizeDocConcentration
+    val seed = randomGenerator.nextLong()
     // If and only if optimizeDocConcentration is set true,
     // we calculate logphat in the same pass as other statistics.
     // No calculation of loghat happens otherwise.
@@ -473,20 +475,21 @@ final class OnlineLDAOptimizer extends LDAOptimizer with Logging {
                                         None
                                       }
 
-    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
-      val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
-
-      val stat = BDM.zeros[Double](k, vocabSize)
-      val logphatPartOption = logphatPartOptionBase()
-      var nonEmptyDocCount: Long = 0L
-      nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
-        nonEmptyDocCount += 1
-        val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
-          termCounts, expElogbetaBc.value, alpha, gammaShape, k)
-        stat(::, ids) := stat(::, ids) + sstats
-        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
-      }
-      Iterator((stat, logphatPartOption, nonEmptyDocCount))
+    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitionsWithIndex {
+      (index, docs) =>
+        val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
+
+        val stat = BDM.zeros[Double](k, vocabSize)
+        val logphatPartOption = logphatPartOptionBase()
+        var nonEmptyDocCount: Long = 0L
+        nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+          nonEmptyDocCount += 1
+          val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
+            termCounts, expElogbetaBc.value, alpha, gammaShape, k, seed + index)
+          stat(::, ids) := stat(::, ids) + sstats
+          logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
+        }
+        Iterator((stat, logphatPartOption, nonEmptyDocCount))
     }
 
     val elementWiseSum = (
@@ -578,7 +581,8 @@ final class OnlineLDAOptimizer extends LDAOptimizer with Logging {
   }
 
   override private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel = {
-    new LocalLDAModel(Matrices.fromBreeze(lambda).transpose, alpha, eta, gammaShape)
+    new LocalLDAModel(Matrices.fromBreeze(lambda).transpose, alpha, eta)
+      .setSeed(randomGenerator.nextLong())
   }
 
 }
@@ -605,18 +609,20 @@ private[clustering] object OnlineLDAOptimizer {
       expElogbeta: BDM[Double],
       alpha: breeze.linalg.Vector[Double],
       gammaShape: Double,
-      k: Int): (BDV[Double], BDM[Double], List[Int]) = {
+      k: Int,
+      seed: Long): (BDV[Double], BDM[Double], List[Int]) = {
     val (ids: List[Int], cts: Array[Double]) = termCounts match {
       case v: DenseVector => ((0 until v.size).toList, v.values)
       case v: SparseVector => (v.indices.toList, v.values)
     }
     // Initialize the variational distribution q(theta|gamma) for the mini-batch
+    val randBasis = new RandBasis(new org.apache.commons.math3.random.MersenneTwister(seed))
     val gammad: BDV[Double] =
-      new Gamma(gammaShape, 1.0 / gammaShape).samplesVector(k)                   // K
+      new Gamma(gammaShape, 1.0 / gammaShape)(randBasis).samplesVector(k)        // K
     val expElogthetad: BDV[Double] = exp(LDAUtils.dirichletExpectation(gammad))  // K
     val expElogbetad = expElogbeta(ids, ::).toDenseMatrix                        // ids * K
 
-    val phiNorm: BDV[Double] = expElogbetad * expElogthetad +:+ 1e-100            // ids
+    val phiNorm: BDV[Double] = expElogbetad * expElogthetad +:+ 1e-100           // ids
     var meanGammaChange = 1D
     val ctsVector = new BDV[Double](cts)                                         // ids
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bfd75cdf/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
index 8d728f0..4d84820 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
@@ -253,6 +253,12 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
     val lda = new LDA()
     testEstimatorAndModelReadWrite(lda, dataset, LDASuite.allParamSettings,
       LDASuite.allParamSettings, checkModelData)
+
+    // Make sure the result is deterministic after saving and loading the model
+    val model = lda.fit(dataset)
+    val model2 = testDefaultReadWrite(model)
+    assert(model.logLikelihood(dataset) ~== model2.logLikelihood(dataset) absTol 1e-6)
+    assert(model.logPerplexity(dataset) ~== model2.logPerplexity(dataset) absTol 1e-6)
   }
 
   test("read/write DistributedLDAModel") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org