You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by hhbyyh <gi...@git.apache.org> on 2015/02/06 05:18:00 UTC

[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

GitHub user hhbyyh opened a pull request:

    https://github.com/apache/spark/pull/4419

    [SPARK-5563][mllib] online lda initial checkin

    JIRA: https://issues.apache.org/jira/browse/SPARK-5563
    The PR contains the implementation for [Online LDA] (https://www.cs.princeton.edu/~blei/papers/HoffmanBleiBach2010b.pdf) based on the research of  Matt Hoffman and David M. Blei, which provides an efficient option for LDA users. Major advantages for the algorithm are the stream compatibility and economic time/memory consumption due to the corpus split.
    For more details, please refer to the jira.
    
    For reviewers:
    
    1. I did some minor change on the return type of `LDA.run` ( change from `DistributedLDAModel` to `LDAModel`), since `DistributedLDAModel`  is based on graph.
    
    2. Current interface of `LDA.run` is actually not efficient for Online algorithm. For online LDA, it can perform the doc2vec in each mini-batch and don't need to hold the corpus in the memory.
    
    3. Currently I use `RDD.randomSplit` to do a horizontal split for the corpus, which downgrades performance (more than 10X slower). Any more proper way to do that?
    
    Performance and result comparison with current EM implementation :
    test data set is repetition of the 6 documents for 100 times:
      apple banana
      apple orange
      orange banana
      tiger cat
      cat dog
      tiger dog
    600 documents and 1200 tokens in total, vocab size is 6, very easy to repro in your PC.
    
     EM implementation: lda.run(corpus), 30 iterations, automatic parameters
    
    >   Corpus summary:
    >	 Training set size: 600 documents
    >	 Vocabulary size: 6 terms
    >	 Training set size: 1200 tokens
    >	 Preprocessing time: 3.965440641 sec
    >    
    >    Finished training LDA model.  Summary:
    >	 Training time: 395.830773969 sec
    >    2 topics:
    >    TOPIC 0
    >    banana	0.18063733648044106
    >    dog	0.17613878600129707
    >    apple	0.1696818853021358
    >    orange	0.1646544894546831
    >    tiger	0.15561684070970766
    >   cat	0.15327066205173534
    >
    >    TOPIC 1
    >    cat	0.18006269828910626
    >    tiger	0.17771651490103385
    >    orange	0.1686788479353741
    >    apple	0.16365144195225495
    >    dog	0.1571945282354216
    >    banana	0.15269596868680924
    
    
    Online LDA: run(corpus, lda.LDAMode.Online)
    
    >   Corpus summary:
    >	 Training set size: 600 documents
    >	 Vocabulary size: 6 terms
    >	 Training set size: 1200 tokens
    >	 Preprocessing time: 4.035652719 sec
    >
    >   Finished training LDA model.  Summary:
    >	 Training time: 15.72914271 sec
    >
    >	2 topics:
    >	TOPIC 0
    >	apple	0.34047846308724955
    >	banana	0.3389019755911641
    >	orange	0.31774004408135487
    >	cat	        9.773363079267432E-4
    >	dog	9.552721891145982E-4
    >	tiger	9.469087431901764E-4
    >
    >	TOPIC 1
    >	cat	0.3519694583370116
    >	tiger	0.3353643872639939
    >	dog	0.30993905428237273
    >	banana	9.528473706557286E-4
    >	apple	8.999766283570917E-4
    >	orange	8.742761176089969E-4
    
    Online version is faster and with better results due to the the algorithm essense (Thanks to Matt Hoffman and David M. Blei)
    The version from https://github.com/hhbyyh/OnlineLDA_Spark is even faster than 2, mainly because it avoids the randomSplit. For the same input, it uses less than 3 seconds including sparkcontext initialization and stop.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/hhbyyh/spark ldaonline

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/4419.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4419
    
----
commit d640d9c58cd4f3caa6eac462b947b3a891dabbda
Author: Yuhao Yang <hh...@gmail.com>
Date:   2015-02-06T03:12:49Z

    online lda initial checkin

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29296387
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -208,3 +225,224 @@ class EMLDAOptimizer extends LDAOptimizer{
         new DistributedLDAModel(this, iterationTimes)
       }
     }
    +
    +
    +/**
    + * :: Experimental ::
    + *
    + * An online optimizer for LDA. The Optimizer implements the Online LDA algorithm, which
    + * processes a subset of the corpus by each call to next, and update the term-topic
    + * distribution adaptively.
    + *
    + * References:
    + *   Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010.
    + */
    +@Experimental
    +class OnlineLDAOptimizer extends LDAOptimizer {
    +
    +  // LDA common parameters
    +  private var k: Int = 0
    +  private var D: Int = 0
    +  private var vocabSize: Int = 0
    +  private var alpha: Double = 0
    +  private var eta: Double = 0
    +  private var randomSeed: Long = 0
    +
    +  // Online LDA specific parameters
    +  private var tau_0: Double = -1
    +  private var kappa: Double = -1
    +  private var batchSize: Int = -1
    +
    +  // internal data structure
    +  private var docs: RDD[(Long, Vector)] = null
    +  private var lambda: BDM[Double] = null
    +  private var Elogbeta: BDM[Double]= null
    +  private var expElogbeta: BDM[Double] = null
    +
    +  // count of invocation to next, used to help deciding the weight for each iteration
    +  private var iteration = 0
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   */
    +  def getTau_0: Double = {
    +    if (this.tau_0 == -1) {
    +      1024
    +    } else {
    +      this.tau_0
    +    }
    +  }
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   * Automatic setting of parameter:
    +   *  - default = 1024, which follows the recommendation from OnlineLDA paper.
    +   */
    +  def setTau_0(tau_0: Double): this.type = {
    +    require(tau_0 > 0 || tau_0 == -1.0,  s"LDA tau_0 must be positive, but was set to $tau_0")
    +    this.tau_0 = tau_0
    +    this
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate
    +   */
    +  def getKappa: Double = {
    +    if (this.kappa == -1) {
    +      0.5
    +    } else {
    +      this.kappa
    +    }
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate---should be between
    +   * (0.5, 1.0] to guarantee asymptotic convergence.
    +   *  - default = 0.5, which follows the recommendation from OnlineLDA paper.
    +   */
    +  def setKappa(kappa: Double): this.type = {
    +    require(kappa >= 0 || kappa == -1.0,
    +      s"Online LDA kappa must be nonnegative (or -1 for auto), but was set to $kappa")
    +    this.kappa = kappa
    +    this
    +  }
    +
    +  /**
    +   * Mini-batch size, which controls how many documents are used in each iteration
    +   */
    +  def getBatchSize: Int = {
    +    if (this.batchSize == -1) {
    +      D / 100
    +    } else {
    +      this.batchSize
    +    }
    +  }
    +
    +  /**
    +   * Mini-batch size, which controls how many documents are used in each iteration
    +   * default = 1% from total documents.
    +   */
    +  def setBatchSize(batchSize: Int): this.type = {
    +    this.batchSize = batchSize
    +    this
    +  }
    +
    +  private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = {
    +
    +    this.k = lda.getK
    +    this.D = docs.count().toInt
    --- End diff --
    
    This should be stored as a Long


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29296359
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -98,13 +94,38 @@ class EMLDAOptimizer extends LDAOptimizer{
       /**
        * Compute bipartite term/doc graph.
        */
    -  private[clustering] override def initialState(
    -      docs: RDD[(Long, Vector)],
    -      k: Int,
    -      docConcentration: Double,
    -      topicConcentration: Double,
    -      randomSeed: Long,
    -      checkpointInterval: Int): LDAOptimizer = {
    +  private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA):
    +  LDAOptimizer = {
    +
    +    val docConcentration = lda.getDocConcentration
    +    val topicConcentration = lda.getTopicConcentration
    +    val k = lda.getK
    +
    +    /**
    --- End diff --
    
    Only 1 asterisk: ```/*``` since it's a comment, not documentation  (Please fix elsewhere too.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97535777
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97399916
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97247950
  
    @hhbyyh Thanks for the update!  It's looking good, but there are some cleanups to do.  There are quite a few more I'd like to do, but since they are mostly Scala style issues and would take a long time to write out in comments, would you mind if I sent a PR to your PR?  Perhaps you can update this PR based on my current comments first.  Once that's done, I can send an update which does the small fixes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-73197604
  
      [Test build #26901 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26901/consoleFull) for   PR 4419 at commit [`f41c5ca`](https://github.com/apache/spark/commit/f41c5ca2d2bb11394882d4212fd4138ae9a972a1).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97399896
  
      [Test build #31274 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31274/consoleFull) for   PR 4419 at commit [`138bfed`](https://github.com/apache/spark/commit/138bfeda88d1a6407bb6425b95f371fe40057977).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `trait LDAOptimizer `
      * `class EMLDAOptimizer extends LDAOptimizer `
      * `class OnlineLDAOptimizer extends LDAOptimizer `
    
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29305239
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -208,3 +225,224 @@ class EMLDAOptimizer extends LDAOptimizer{
         new DistributedLDAModel(this, iterationTimes)
       }
     }
    +
    +
    +/**
    + * :: Experimental ::
    + *
    + * An online optimizer for LDA. The Optimizer implements the Online LDA algorithm, which
    + * processes a subset of the corpus by each call to next, and update the term-topic
    + * distribution adaptively.
    + *
    + * References:
    + *   Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010.
    + */
    +@Experimental
    +class OnlineLDAOptimizer extends LDAOptimizer {
    +
    +  // LDA common parameters
    +  private var k: Int = 0
    +  private var D: Int = 0
    +  private var vocabSize: Int = 0
    +  private var alpha: Double = 0
    +  private var eta: Double = 0
    +  private var randomSeed: Long = 0
    +
    +  // Online LDA specific parameters
    +  private var tau_0: Double = -1
    +  private var kappa: Double = -1
    +  private var batchSize: Int = -1
    +
    +  // internal data structure
    +  private var docs: RDD[(Long, Vector)] = null
    +  private var lambda: BDM[Double] = null
    +  private var Elogbeta: BDM[Double]= null
    +  private var expElogbeta: BDM[Double] = null
    +
    +  // count of invocation to next, used to help deciding the weight for each iteration
    +  private var iteration = 0
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   */
    +  def getTau_0: Double = {
    +    if (this.tau_0 == -1) {
    +      1024
    +    } else {
    +      this.tau_0
    +    }
    +  }
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   * Automatic setting of parameter:
    +   *  - default = 1024, which follows the recommendation from OnlineLDA paper.
    +   */
    +  def setTau_0(tau_0: Double): this.type = {
    +    require(tau_0 > 0 || tau_0 == -1.0,  s"LDA tau_0 must be positive, but was set to $tau_0")
    +    this.tau_0 = tau_0
    +    this
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate
    +   */
    +  def getKappa: Double = {
    +    if (this.kappa == -1) {
    +      0.5
    +    } else {
    +      this.kappa
    +    }
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate---should be between
    +   * (0.5, 1.0] to guarantee asymptotic convergence.
    +   *  - default = 0.5, which follows the recommendation from OnlineLDA paper.
    +   */
    +  def setKappa(kappa: Double): this.type = {
    +    require(kappa >= 0 || kappa == -1.0,
    +      s"Online LDA kappa must be nonnegative (or -1 for auto), but was set to $kappa")
    +    this.kappa = kappa
    +    this
    +  }
    +
    +  /**
    +   * Mini-batch size, which controls how many documents are used in each iteration
    +   */
    +  def getBatchSize: Int = {
    +    if (this.batchSize == -1) {
    +      D / 100
    +    } else {
    +      this.batchSize
    +    }
    +  }
    +
    +  /**
    +   * Mini-batch size, which controls how many documents are used in each iteration
    +   * default = 1% from total documents.
    +   */
    +  def setBatchSize(batchSize: Int): this.type = {
    +    this.batchSize = batchSize
    +    this
    +  }
    +
    +  private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = {
    +
    +    this.k = lda.getK
    +    this.D = docs.count().toInt
    +    this.vocabSize = docs.first()._2.size
    +    this.alpha = if (lda.getDocConcentration == -1) 1.0 / k else lda.getDocConcentration
    +    this.eta = if (lda.getTopicConcentration == -1) 1.0 / k else lda.getTopicConcentration
    +    this.randomSeed = randomSeed
    +
    +    this.docs = docs
    +
    +    // Initialize the variational distribution q(beta|lambda)
    +    this.lambda = getGammaMatrix(k, vocabSize)
    +    this.Elogbeta = dirichlet_expectation(lambda)
    +    this.expElogbeta = exp(Elogbeta)
    +    this.iteration = 0
    +    this
    +  }
    +
    +  /**
    +   * Submit a a subset (like 1%, decide by the batchSize) of the corpus to the Online LDA model,
    +   * and it will update the topic distribution adaptively for the terms appearing in the subset.
    +   *
    +   * @return  Inferred LDA model
    +   */
    +  private[clustering] override def next(): OnlineLDAOptimizer = {
    +    iteration += 1
    +    val batchSize = getBatchSize
    +    val batch = docs.sample(true, batchSize.toDouble / D, randomSeed).cache()
    +    if(batch.isEmpty()) return this
    +
    +    val k = this.k
    +    val vocabSize = this.vocabSize
    +    val expElogbeta = this.expElogbeta
    +    val alpha = this.alpha
    +
    +    val stats = batch.mapPartitions(docs =>{
    +      val stat = BDM.zeros[Double](k, vocabSize)
    +      docs.foreach(doc =>{
    +        val termCounts = doc._2
    +        val (ids, cts) = termCounts match {
    +          case v: DenseVector => (((0 until v.size).toList), v.values)
    +          case v: SparseVector => (v.indices.toList, v.values)
    +          case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
    +        }
    +
    +        // Initialize the variational distribution q(theta|gamma) for the mini-batch
    +        var gammad = new Gamma(100, 1.0 / 100.0).samplesVector(k).t // 1 * K
    +        var Elogthetad = digamma(gammad) - digamma(sum(gammad))     // 1 * K
    +        var expElogthetad = exp(Elogthetad)                         // 1 * K
    +        val expElogbetad = expElogbeta(::, ids).toDenseMatrix       // K * ids
    +
    +        var phinorm = expElogthetad * expElogbetad + 1e-100         // 1 * ids
    +        var meanchange = 1D
    +        val ctsVector = new BDV[Double](cts).t                      // 1 * ids
    +
    +        // Iterate between gamma and phi until convergence
    +        while (meanchange > 1e-5) {
    +          val lastgamma = gammad
    +          //        1*K                  1 * ids               ids * k
    +          gammad = (expElogthetad :* ((ctsVector / phinorm) * (expElogbetad.t))) + alpha
    +          Elogthetad = digamma(gammad) - digamma(sum(gammad))
    +          expElogthetad = exp(Elogthetad)
    +          phinorm = expElogthetad * expElogbetad + 1e-100
    +          meanchange = sum(abs(gammad - lastgamma)) / k
    +        }
    +
    +        val m1 = expElogthetad.t.toDenseMatrix.t
    +        val m2 = (ctsVector / phinorm).t.toDenseMatrix
    +        val outerResult = kron(m1, m2) // K * ids
    +        for (i <- 0 until ids.size) {
    +          stat(::, ids(i)) := (stat(::, ids(i)) + outerResult(::, i))
    +        }
    +        stat
    +      })
    +      Iterator(stat)
    +    })
    +
    +    val batchResult = stats.reduce(_ += _)
    +    update(batchResult, iteration, batchSize)
    +    batch.unpersist()
    +    this
    +  }
    +
    +  private[clustering] override def getLDAModel(iterationTimes: Array[Double]): LDAModel = {
    +    new LocalLDAModel(Matrices.fromBreeze(lambda).transpose)
    +  }
    +
    +  private def update(raw: BDM[Double], iter:Int, batchSize: Int): Unit ={
    +
    +    val tau_0 = this.getTau_0
    +    val kappa = this.getKappa
    +
    +    // weight of the mini-batch.
    +    val weight = math.pow(tau_0 + iter, -kappa)
    +
    +    // This step finishes computing the sufficient statistics for the M step
    +    val stat = raw :* expElogbeta
    +
    +    // Update lambda based on documents.
    +    lambda = lambda * (1 - weight) + (stat * D.toDouble / batchSize.toDouble + eta) * weight
    +    Elogbeta = dirichlet_expectation(lambda)
    +    expElogbeta = exp(Elogbeta)
    +  }
    +
    +  private def getGammaMatrix(row:Int, col:Int): BDM[Double] ={
    +    val gammaRandomGenerator = new Gamma(100, 1.0 / 100.0)
    --- End diff --
    
    It looks like you can pass a RandBasis to Gamma: [http://www.scalanlp.org/api/breeze/index.html#breeze.stats.distributions.Gamma]
    
    And that RandBasis can be constructed using a RandomGenerator: [http://www.scalanlp.org/api/breeze/index.html#breeze.stats.distributions.RandBasis]
    
    And that RandomGenerator can use a seed.  A little complicated...but doable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97506569
  
    Hi @jkbradley, for the last ut, "OnlineLDAOptimizer with toy data". It costs about 300 ms on my PC, and I'm OK if it's considered redundant or dangerous ( I ran it about 2000 times and all passed). Yet there's theoretical  possibility of failure.
    
    And for performance test, I used 20_newsGroups * 1000 (about 11314000 articles), costs about 20 minutes on 6 node cluster. ( I believe there's still room for improvement).
    
    Thanks for helping clean the style and some important fix from your PR.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-78414645
  
      [Test build #28495 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28495/consoleFull) for   PR 4419 at commit [`02d0373`](https://github.com/apache/spark/commit/02d037387f32adcddd98858176813f3a66991a38).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29296371
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -208,3 +225,224 @@ class EMLDAOptimizer extends LDAOptimizer{
         new DistributedLDAModel(this, iterationTimes)
       }
     }
    +
    +
    +/**
    + * :: Experimental ::
    + *
    + * An online optimizer for LDA. The Optimizer implements the Online LDA algorithm, which
    --- End diff --
    
    "Online LDA algorithm" --> "Online variational Bayes LDA algorithm"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-85505276
  
    Updated, let me know if this is closer to what's in your mind. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29296400
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -208,3 +225,224 @@ class EMLDAOptimizer extends LDAOptimizer{
         new DistributedLDAModel(this, iterationTimes)
       }
     }
    +
    +
    +/**
    + * :: Experimental ::
    + *
    + * An online optimizer for LDA. The Optimizer implements the Online LDA algorithm, which
    + * processes a subset of the corpus by each call to next, and update the term-topic
    + * distribution adaptively.
    + *
    + * References:
    + *   Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010.
    + */
    +@Experimental
    +class OnlineLDAOptimizer extends LDAOptimizer {
    +
    +  // LDA common parameters
    +  private var k: Int = 0
    +  private var D: Int = 0
    +  private var vocabSize: Int = 0
    +  private var alpha: Double = 0
    +  private var eta: Double = 0
    +  private var randomSeed: Long = 0
    +
    +  // Online LDA specific parameters
    +  private var tau_0: Double = -1
    +  private var kappa: Double = -1
    +  private var batchSize: Int = -1
    +
    +  // internal data structure
    +  private var docs: RDD[(Long, Vector)] = null
    +  private var lambda: BDM[Double] = null
    +  private var Elogbeta: BDM[Double]= null
    +  private var expElogbeta: BDM[Double] = null
    +
    +  // count of invocation to next, used to help deciding the weight for each iteration
    +  private var iteration = 0
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   */
    +  def getTau_0: Double = {
    +    if (this.tau_0 == -1) {
    +      1024
    +    } else {
    +      this.tau_0
    +    }
    +  }
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   * Automatic setting of parameter:
    +   *  - default = 1024, which follows the recommendation from OnlineLDA paper.
    +   */
    +  def setTau_0(tau_0: Double): this.type = {
    +    require(tau_0 > 0 || tau_0 == -1.0,  s"LDA tau_0 must be positive, but was set to $tau_0")
    +    this.tau_0 = tau_0
    +    this
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate
    +   */
    +  def getKappa: Double = {
    +    if (this.kappa == -1) {
    +      0.5
    +    } else {
    +      this.kappa
    +    }
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate---should be between
    +   * (0.5, 1.0] to guarantee asymptotic convergence.
    +   *  - default = 0.5, which follows the recommendation from OnlineLDA paper.
    +   */
    +  def setKappa(kappa: Double): this.type = {
    +    require(kappa >= 0 || kappa == -1.0,
    +      s"Online LDA kappa must be nonnegative (or -1 for auto), but was set to $kappa")
    +    this.kappa = kappa
    +    this
    +  }
    +
    +  /**
    +   * Mini-batch size, which controls how many documents are used in each iteration
    +   */
    +  def getBatchSize: Int = {
    +    if (this.batchSize == -1) {
    +      D / 100
    +    } else {
    +      this.batchSize
    +    }
    +  }
    +
    +  /**
    +   * Mini-batch size, which controls how many documents are used in each iteration
    +   * default = 1% from total documents.
    +   */
    +  def setBatchSize(batchSize: Int): this.type = {
    +    this.batchSize = batchSize
    +    this
    +  }
    +
    +  private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = {
    +
    +    this.k = lda.getK
    +    this.D = docs.count().toInt
    +    this.vocabSize = docs.first()._2.size
    +    this.alpha = if (lda.getDocConcentration == -1) 1.0 / k else lda.getDocConcentration
    +    this.eta = if (lda.getTopicConcentration == -1) 1.0 / k else lda.getTopicConcentration
    +    this.randomSeed = randomSeed
    +
    +    this.docs = docs
    +
    +    // Initialize the variational distribution q(beta|lambda)
    +    this.lambda = getGammaMatrix(k, vocabSize)
    +    this.Elogbeta = dirichlet_expectation(lambda)
    +    this.expElogbeta = exp(Elogbeta)
    +    this.iteration = 0
    +    this
    +  }
    +
    +  /**
    +   * Submit a a subset (like 1%, decide by the batchSize) of the corpus to the Online LDA model,
    +   * and it will update the topic distribution adaptively for the terms appearing in the subset.
    +   *
    +   * @return  Inferred LDA model
    +   */
    +  private[clustering] override def next(): OnlineLDAOptimizer = {
    +    iteration += 1
    +    val batchSize = getBatchSize
    +    val batch = docs.sample(true, batchSize.toDouble / D, randomSeed).cache()
    +    if(batch.isEmpty()) return this
    +
    +    val k = this.k
    +    val vocabSize = this.vocabSize
    +    val expElogbeta = this.expElogbeta
    +    val alpha = this.alpha
    +
    +    val stats = batch.mapPartitions(docs =>{
    +      val stat = BDM.zeros[Double](k, vocabSize)
    +      docs.foreach(doc =>{
    +        val termCounts = doc._2
    +        val (ids, cts) = termCounts match {
    +          case v: DenseVector => (((0 until v.size).toList), v.values)
    +          case v: SparseVector => (v.indices.toList, v.values)
    +          case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
    +        }
    +
    +        // Initialize the variational distribution q(theta|gamma) for the mini-batch
    +        var gammad = new Gamma(100, 1.0 / 100.0).samplesVector(k).t // 1 * K
    +        var Elogthetad = digamma(gammad) - digamma(sum(gammad))     // 1 * K
    +        var expElogthetad = exp(Elogthetad)                         // 1 * K
    +        val expElogbetad = expElogbeta(::, ids).toDenseMatrix       // K * ids
    +
    +        var phinorm = expElogthetad * expElogbetad + 1e-100         // 1 * ids
    +        var meanchange = 1D
    +        val ctsVector = new BDV[Double](cts).t                      // 1 * ids
    +
    +        // Iterate between gamma and phi until convergence
    +        while (meanchange > 1e-5) {
    +          val lastgamma = gammad
    +          //        1*K                  1 * ids               ids * k
    +          gammad = (expElogthetad :* ((ctsVector / phinorm) * (expElogbetad.t))) + alpha
    +          Elogthetad = digamma(gammad) - digamma(sum(gammad))
    +          expElogthetad = exp(Elogthetad)
    +          phinorm = expElogthetad * expElogbetad + 1e-100
    +          meanchange = sum(abs(gammad - lastgamma)) / k
    +        }
    +
    +        val m1 = expElogthetad.t.toDenseMatrix.t
    +        val m2 = (ctsVector / phinorm).t.toDenseMatrix
    +        val outerResult = kron(m1, m2) // K * ids
    +        for (i <- 0 until ids.size) {
    +          stat(::, ids(i)) := (stat(::, ids(i)) + outerResult(::, i))
    +        }
    +        stat
    +      })
    +      Iterator(stat)
    +    })
    +
    +    val batchResult = stats.reduce(_ += _)
    +    update(batchResult, iteration, batchSize)
    +    batch.unpersist()
    +    this
    +  }
    +
    +  private[clustering] override def getLDAModel(iterationTimes: Array[Double]): LDAModel = {
    +    new LocalLDAModel(Matrices.fromBreeze(lambda).transpose)
    +  }
    +
    +  private def update(raw: BDM[Double], iter:Int, batchSize: Int): Unit ={
    +
    +    val tau_0 = this.getTau_0
    +    val kappa = this.getKappa
    +
    +    // weight of the mini-batch.
    +    val weight = math.pow(tau_0 + iter, -kappa)
    +
    +    // This step finishes computing the sufficient statistics for the M step
    +    val stat = raw :* expElogbeta
    +
    +    // Update lambda based on documents.
    +    lambda = lambda * (1 - weight) + (stat * D.toDouble / batchSize.toDouble + eta) * weight
    +    Elogbeta = dirichlet_expectation(lambda)
    +    expElogbeta = exp(Elogbeta)
    +  }
    +
    +  private def getGammaMatrix(row:Int, col:Int): BDM[Double] ={
    +    val gammaRandomGenerator = new Gamma(100, 1.0 / 100.0)
    --- End diff --
    
    This should use a random seed generated by the java.util.Random instance


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-89109912
  
    @jkbradley Thanks for the quick response.
    
    Sure I can start the comparison. Meanwhile, I've read your comments in the Gibbs PR and I agree with the general proposal. Let me know if you think we should get rid of the OnlineLDA class (current builder version) and move the run function back to LDA. 
    I'm not sure if it's proper to introduce many specific arguments (miniBatchFraction, tau_0 and kappa) to LDA. That's my only concern.
    
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-73644693
  
      [Test build #27176 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27176/consoleFull) for   PR 4419 at commit [`0d0f3ee`](https://github.com/apache/spark/commit/0d0f3eef6d4e2754bfa2904f30bf9e21005ae392).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97282597
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-86883524
  
      [Test build #29299 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29299/consoleFull) for   PR 4419 at commit [`97b9e1a`](https://github.com/apache/spark/commit/97b9e1abde2aaa57a692713d219b314000b51095).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-73188654
  
      [Test build #26901 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26901/consoleFull) for   PR 4419 at commit [`f41c5ca`](https://github.com/apache/spark/commit/f41c5ca2d2bb11394882d4212fd4138ae9a972a1).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29305155
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -208,3 +225,224 @@ class EMLDAOptimizer extends LDAOptimizer{
         new DistributedLDAModel(this, iterationTimes)
       }
     }
    +
    +
    +/**
    + * :: Experimental ::
    + *
    + * An online optimizer for LDA. The Optimizer implements the Online LDA algorithm, which
    + * processes a subset of the corpus by each call to next, and update the term-topic
    + * distribution adaptively.
    + *
    + * References:
    + *   Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010.
    + */
    +@Experimental
    +class OnlineLDAOptimizer extends LDAOptimizer {
    +
    +  // LDA common parameters
    +  private var k: Int = 0
    +  private var D: Int = 0
    +  private var vocabSize: Int = 0
    +  private var alpha: Double = 0
    +  private var eta: Double = 0
    +  private var randomSeed: Long = 0
    +
    +  // Online LDA specific parameters
    +  private var tau_0: Double = -1
    +  private var kappa: Double = -1
    +  private var batchSize: Int = -1
    +
    +  // internal data structure
    +  private var docs: RDD[(Long, Vector)] = null
    +  private var lambda: BDM[Double] = null
    +  private var Elogbeta: BDM[Double]= null
    +  private var expElogbeta: BDM[Double] = null
    +
    +  // count of invocation to next, used to help deciding the weight for each iteration
    +  private var iteration = 0
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   */
    +  def getTau_0: Double = {
    +    if (this.tau_0 == -1) {
    +      1024
    +    } else {
    +      this.tau_0
    +    }
    +  }
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   * Automatic setting of parameter:
    +   *  - default = 1024, which follows the recommendation from OnlineLDA paper.
    +   */
    +  def setTau_0(tau_0: Double): this.type = {
    +    require(tau_0 > 0 || tau_0 == -1.0,  s"LDA tau_0 must be positive, but was set to $tau_0")
    +    this.tau_0 = tau_0
    +    this
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate
    +   */
    +  def getKappa: Double = {
    +    if (this.kappa == -1) {
    +      0.5
    +    } else {
    +      this.kappa
    +    }
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate---should be between
    +   * (0.5, 1.0] to guarantee asymptotic convergence.
    +   *  - default = 0.5, which follows the recommendation from OnlineLDA paper.
    --- End diff --
    
    Yes, please, the comment helps.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29473550
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -202,9 +202,241 @@ class EMLDAOptimizer extends LDAOptimizer{
         graph.vertices.filter(isTermVertex).values.fold(BDV.zeros[Double](numTopics))(_ += _)
       }
     
    -  private[clustering] override def getLDAModel(iterationTimes: Array[Double]): LDAModel = {
    +  override private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel = {
         require(graph != null, "graph is null, EMLDAOptimizer not initialized.")
         this.graphCheckpointer.deleteAllCheckpoints()
         new DistributedLDAModel(this, iterationTimes)
       }
     }
    +
    +
    +/**
    + * :: Experimental ::
    + *
    + * An online optimizer for LDA. The Optimizer implements the Online variational Bayes LDA
    + * algorithm, which processes a subset of the corpus on each iteration, and updates the term-topic
    + * distribution adaptively.
    + *
    + * Original Online LDA paper:
    + *   Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010.
    + */
    +@Experimental
    +class OnlineLDAOptimizer extends LDAOptimizer {
    +
    +  // LDA common parameters
    +  private var k: Int = 0
    +  private var corpusSize: Long = 0
    +  private var vocabSize: Int = 0
    +  private[clustering] var alpha: Double = 0
    +  private[clustering] var eta: Double = 0
    +  private var randomGenerator: java.util.Random = null
    +
    +  // Online LDA specific parameters
    +  private var tau_0: Double = 1024
    +  private var kappa: Double = 0.51
    +  private var miniBatchFraction: Double = 0.01
    +
    +  // internal data structure
    +  private var docs: RDD[(Long, Vector)] = null
    +  private[clustering] var lambda: BDM[Double] = null
    +
    +  // count of invocation to next, which helps deciding the weight for each iteration
    +  private var iteration: Int = 0
    +  private var gammaShape: Double = 100
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations. Larger values make early
    +   * iterations count less.
    +   */
    +  def getTau_0: Double = this.tau_0
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations. Larger values make early
    +   * iterations count less.
    +   * Default: 1024, following the original Online LDA paper.
    +   */
    +  def setTau_0(tau_0: Double): this.type = {
    +    require(tau_0 > 0,  s"LDA tau_0 must be positive, but was set to $tau_0")
    +    this.tau_0 = tau_0
    +    this
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate
    +   */
    +  def getKappa: Double = this.kappa
    +
    +  /**
    +   * Learning rate: exponential decay rate---should be between
    +   * (0.5, 1.0] to guarantee asymptotic convergence.
    +   * Default: 0.51, based on the original Online LDA paper.
    +   */
    +  def setKappa(kappa: Double): this.type = {
    +    require(kappa >= 0, s"Online LDA kappa must be nonnegative, but was set to $kappa")
    +    this.kappa = kappa
    +    this
    +  }
    +
    +  /**
    +   * Mini-batch fraction, which sets the fraction of document sampled and used in each iteration
    +   */
    +  def getMiniBatchFraction: Double = this.miniBatchFraction
    +
    +  /**
    +   * Mini-batch fraction in (0, 1], which sets the fraction of document sampled and used in
    +   * each iteration.
    +   * Default: 0.01, i.e., 1% of total documents
    +   */
    +  def setMiniBatchFraction(miniBatchFraction: Double): this.type = {
    +    require(miniBatchFraction > 0.0 && miniBatchFraction <= 1.0,
    +      s"Online LDA miniBatchFraction must be in range (0,1], but was set to $miniBatchFraction")
    +    this.miniBatchFraction = miniBatchFraction
    +    this
    +  }
    +
    +  /**
    +   * The function is for test only now. In the future, it can help support training strop/resume
    +   */
    +  private[clustering] def setLambda(lambda: BDM[Double]): this.type = {
    +    this.lambda = lambda
    +    this
    +  }
    +
    +  /**
    +   * Used to control the gamma distribution. Larger value produces values closer to 1.0.
    +   */
    +  private[clustering] def setGammaShape(shape: Double): this.type = {
    +    this.gammaShape = shape
    +    this
    +  }
    +
    +  override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA):
    +  OnlineLDAOptimizer = {
    +    this.k = lda.getK
    +    this.corpusSize = docs.count()
    +    this.vocabSize = docs.first()._2.size
    +    this.alpha = if (lda.getDocConcentration == -1) 1.0 / k else lda.getDocConcentration
    +    this.eta = if (lda.getTopicConcentration == -1) 1.0 / k else lda.getTopicConcentration
    +    this.randomGenerator = new Random(lda.getSeed)
    +
    +    this.docs = docs
    +
    +    // Initialize the variational distribution q(beta|lambda)
    +    this.lambda = getGammaMatrix(k, vocabSize)
    +    this.iteration = 0
    +    this
    +  }
    +
    +  override private[clustering] def next(): OnlineLDAOptimizer = {
    +    val batch = docs.sample(withReplacement = true, miniBatchFraction, randomGenerator.nextLong())
    +    if (batch.isEmpty()) return this
    +    submitMiniBatch(batch)
    +  }
    +
    +
    --- End diff --
    
    scala style: remove extra newline


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97656726
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-77060084
  
    I'd recommend RDD.sample() with replacement for sampling.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29296365
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -98,13 +94,38 @@ class EMLDAOptimizer extends LDAOptimizer{
       /**
        * Compute bipartite term/doc graph.
        */
    -  private[clustering] override def initialState(
    -      docs: RDD[(Long, Vector)],
    -      k: Int,
    -      docConcentration: Double,
    -      topicConcentration: Double,
    -      randomSeed: Long,
    -      checkpointInterval: Int): LDAOptimizer = {
    +  private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA):
    +  LDAOptimizer = {
    +
    +    val docConcentration = lda.getDocConcentration
    +    val topicConcentration = lda.getTopicConcentration
    +    val k = lda.getK
    +
    +    /**
    +     * Note: The restriction > 1.0 may be relaxed in the future (allowing sparse solutions),
    +     *       but values in (0,1) are not yet supported.
    +     */
    +    require(docConcentration > 1.0 || docConcentration == -1.0, s"LDA docConcentration must be" +
    +      s" > 1.0 (or -1 for auto) for EM Optimizer, but was set to $docConcentration")
    +    require(topicConcentration > 1.0 || topicConcentration == -1.0, s"LDA topicConcentration " +
    +      s"must be > 1.0 (or -1 for auto) for EM Optimizer, but was set to $topicConcentration")
    +
    +    /**
    +     *  - For EM: default = (50 / k) + 1.
    +     *     - The 50/k is common in LDA libraries.
    +     *     - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM.
    +     */
    +    this.docConcentration = if (docConcentration == -1) (50.0 / k) + 1.0 else docConcentration
    +
    +    /**
    +     *  - For EM: default = 0.1 + 1.
    --- End diff --
    
    No need to duplicate this comment since it's already in the Scala doc above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-73181005
  
      [Test build #26895 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26895/consoleFull) for   PR 4419 at commit [`d640d9c`](https://github.com/apache/spark/commit/d640d9c58cd4f3caa6eac462b947b3a891dabbda).
     * This patch **fails Scala style tests**.
     * This patch **does not merge cleanly**.
     * This patch adds the following public classes _(experimental)_:
      * `  class OnlineLDAOptimizer(`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-78420576
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28495/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97253799
  
    Also, there are a few to-do items:
    * unit tests
      * This is the big item.  Do you have an idea of how you plan to test this?  Some things, such as getters and setters, will be easy to test.  But the algorithm itself may be difficult.  Some possibilities are:
        * Break algorithm into pieces, and test each piece against hand-computed values.
        * Test 1 iteration of the algorithm with miniBatchFraction = 1.0 on a tiny dataset, and compared against values computed using Blei's code (or some other reference implementation).
      * Also, Java tests will be nice to make sure the API works for Java.  These don't need to do much beyond calling all methods to make sure the method calls compile and run in Java.
    * example app: This would be nice to have and hopefully could involve a slight modification of the current LDAExample
    * programming guide update: This will be a small update to the LDA section in the clustering guide.
    
    The example app and programming guide can be in follow-up PRs, or in this one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29296393
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -208,3 +225,224 @@ class EMLDAOptimizer extends LDAOptimizer{
         new DistributedLDAModel(this, iterationTimes)
       }
     }
    +
    +
    +/**
    + * :: Experimental ::
    + *
    + * An online optimizer for LDA. The Optimizer implements the Online LDA algorithm, which
    + * processes a subset of the corpus by each call to next, and update the term-topic
    + * distribution adaptively.
    + *
    + * References:
    + *   Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010.
    + */
    +@Experimental
    +class OnlineLDAOptimizer extends LDAOptimizer {
    +
    +  // LDA common parameters
    +  private var k: Int = 0
    +  private var D: Int = 0
    +  private var vocabSize: Int = 0
    +  private var alpha: Double = 0
    +  private var eta: Double = 0
    +  private var randomSeed: Long = 0
    +
    +  // Online LDA specific parameters
    +  private var tau_0: Double = -1
    +  private var kappa: Double = -1
    +  private var batchSize: Int = -1
    +
    +  // internal data structure
    +  private var docs: RDD[(Long, Vector)] = null
    +  private var lambda: BDM[Double] = null
    +  private var Elogbeta: BDM[Double]= null
    +  private var expElogbeta: BDM[Double] = null
    +
    +  // count of invocation to next, used to help deciding the weight for each iteration
    +  private var iteration = 0
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   */
    +  def getTau_0: Double = {
    +    if (this.tau_0 == -1) {
    +      1024
    +    } else {
    +      this.tau_0
    +    }
    +  }
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   * Automatic setting of parameter:
    +   *  - default = 1024, which follows the recommendation from OnlineLDA paper.
    +   */
    +  def setTau_0(tau_0: Double): this.type = {
    +    require(tau_0 > 0 || tau_0 == -1.0,  s"LDA tau_0 must be positive, but was set to $tau_0")
    +    this.tau_0 = tau_0
    +    this
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate
    +   */
    +  def getKappa: Double = {
    +    if (this.kappa == -1) {
    +      0.5
    +    } else {
    +      this.kappa
    +    }
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate---should be between
    +   * (0.5, 1.0] to guarantee asymptotic convergence.
    +   *  - default = 0.5, which follows the recommendation from OnlineLDA paper.
    +   */
    +  def setKappa(kappa: Double): this.type = {
    +    require(kappa >= 0 || kappa == -1.0,
    +      s"Online LDA kappa must be nonnegative (or -1 for auto), but was set to $kappa")
    +    this.kappa = kappa
    +    this
    +  }
    +
    +  /**
    +   * Mini-batch size, which controls how many documents are used in each iteration
    +   */
    +  def getBatchSize: Int = {
    +    if (this.batchSize == -1) {
    +      D / 100
    +    } else {
    +      this.batchSize
    +    }
    +  }
    +
    +  /**
    +   * Mini-batch size, which controls how many documents are used in each iteration
    +   * default = 1% from total documents.
    +   */
    +  def setBatchSize(batchSize: Int): this.type = {
    +    this.batchSize = batchSize
    +    this
    +  }
    +
    +  private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = {
    +
    +    this.k = lda.getK
    +    this.D = docs.count().toInt
    +    this.vocabSize = docs.first()._2.size
    +    this.alpha = if (lda.getDocConcentration == -1) 1.0 / k else lda.getDocConcentration
    +    this.eta = if (lda.getTopicConcentration == -1) 1.0 / k else lda.getTopicConcentration
    +    this.randomSeed = randomSeed
    +
    +    this.docs = docs
    +
    +    // Initialize the variational distribution q(beta|lambda)
    +    this.lambda = getGammaMatrix(k, vocabSize)
    +    this.Elogbeta = dirichlet_expectation(lambda)
    +    this.expElogbeta = exp(Elogbeta)
    +    this.iteration = 0
    +    this
    +  }
    +
    +  /**
    +   * Submit a a subset (like 1%, decide by the batchSize) of the corpus to the Online LDA model,
    +   * and it will update the topic distribution adaptively for the terms appearing in the subset.
    +   *
    +   * @return  Inferred LDA model
    +   */
    +  private[clustering] override def next(): OnlineLDAOptimizer = {
    +    iteration += 1
    +    val batchSize = getBatchSize
    +    val batch = docs.sample(true, batchSize.toDouble / D, randomSeed).cache()
    --- End diff --
    
    This batch is only used once, so you won't gain anything by caching it.
    
    randomSeed: Currently, this will use the same sample on every iteration.  You should create a java.util.Random instance, set the seed using randomSeed, and then use it on each iteration to create a new random seed to pass to sample().


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29296355
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala ---
    @@ -100,13 +94,12 @@ class LDA private (
        *  - For EM: default = (50 / k) + 1.
        *     - The 50/k is common in LDA libraries.
        *     - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM.
    +   *  - For Online: default = (1.0 / k).
    +   *     - follows the implementation from: https://github.com/Blei-Lab/onlineldavb.
    --- End diff --
    
    Put double brackets around link:
    ```[[https://github.com/Blei-Lab/onlineldavb]]```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29296389
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -208,3 +225,224 @@ class EMLDAOptimizer extends LDAOptimizer{
         new DistributedLDAModel(this, iterationTimes)
       }
     }
    +
    +
    +/**
    + * :: Experimental ::
    + *
    + * An online optimizer for LDA. The Optimizer implements the Online LDA algorithm, which
    + * processes a subset of the corpus by each call to next, and update the term-topic
    + * distribution adaptively.
    + *
    + * References:
    + *   Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010.
    + */
    +@Experimental
    +class OnlineLDAOptimizer extends LDAOptimizer {
    +
    +  // LDA common parameters
    +  private var k: Int = 0
    +  private var D: Int = 0
    +  private var vocabSize: Int = 0
    +  private var alpha: Double = 0
    +  private var eta: Double = 0
    +  private var randomSeed: Long = 0
    +
    +  // Online LDA specific parameters
    +  private var tau_0: Double = -1
    +  private var kappa: Double = -1
    +  private var batchSize: Int = -1
    +
    +  // internal data structure
    +  private var docs: RDD[(Long, Vector)] = null
    +  private var lambda: BDM[Double] = null
    +  private var Elogbeta: BDM[Double]= null
    +  private var expElogbeta: BDM[Double] = null
    +
    +  // count of invocation to next, used to help deciding the weight for each iteration
    +  private var iteration = 0
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   */
    +  def getTau_0: Double = {
    +    if (this.tau_0 == -1) {
    +      1024
    +    } else {
    +      this.tau_0
    +    }
    +  }
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   * Automatic setting of parameter:
    +   *  - default = 1024, which follows the recommendation from OnlineLDA paper.
    +   */
    +  def setTau_0(tau_0: Double): this.type = {
    +    require(tau_0 > 0 || tau_0 == -1.0,  s"LDA tau_0 must be positive, but was set to $tau_0")
    +    this.tau_0 = tau_0
    +    this
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate
    +   */
    +  def getKappa: Double = {
    +    if (this.kappa == -1) {
    +      0.5
    +    } else {
    +      this.kappa
    +    }
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate---should be between
    +   * (0.5, 1.0] to guarantee asymptotic convergence.
    +   *  - default = 0.5, which follows the recommendation from OnlineLDA paper.
    +   */
    +  def setKappa(kappa: Double): this.type = {
    +    require(kappa >= 0 || kappa == -1.0,
    +      s"Online LDA kappa must be nonnegative (or -1 for auto), but was set to $kappa")
    +    this.kappa = kappa
    +    this
    +  }
    +
    +  /**
    +   * Mini-batch size, which controls how many documents are used in each iteration
    +   */
    +  def getBatchSize: Int = {
    +    if (this.batchSize == -1) {
    +      D / 100
    +    } else {
    +      this.batchSize
    +    }
    +  }
    +
    +  /**
    +   * Mini-batch size, which controls how many documents are used in each iteration
    +   * default = 1% from total documents.
    +   */
    +  def setBatchSize(batchSize: Int): this.type = {
    +    this.batchSize = batchSize
    +    this
    +  }
    +
    +  private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = {
    +
    +    this.k = lda.getK
    +    this.D = docs.count().toInt
    +    this.vocabSize = docs.first()._2.size
    +    this.alpha = if (lda.getDocConcentration == -1) 1.0 / k else lda.getDocConcentration
    +    this.eta = if (lda.getTopicConcentration == -1) 1.0 / k else lda.getTopicConcentration
    +    this.randomSeed = randomSeed
    --- End diff --
    
    should be lda.randomSeed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-88889477
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29607/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-73652808
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27177/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-76741787
  
    @jkbradley. I was on vacation last two weeks. Really appreciate the detailed comments and I know how time consuming it can be.
    
    * About batch split. I used docId % batchNumber to split documents into `batchNumber` batches in the new commit. Will that work? I'm not sure I understand how stochastic gradient descent help in this case. 
    
    * local vs. distributed models: Indeed capacity of current implementation is limited by the local matrix (lambda: vocabSize * k < 2 ^31 - 1). Since online LDA don't need to hold the entire corpus, documents number is not a concern. In each `seqop` of the aggregate, matrix in calculation is bound to k * ids, where ids is the number of terms in each document. So the problem is how to resolve the limitation on lambda. My initial idea is to support local matrix for now and add support for distributed matrix in the future. I'll explore the upper limit for the current local matrix. (scale  estimation is 100000 (vocab) * 1000 (topics), no limit on documents number)
    
    I made some changes according to the last two points. Not sure about how to fit current version to the optimization steps. I thought the code is only for LDA and hard to be used in other context. Is there any example I can refer to? Thanks a lot.
    
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97656685
  
      [Test build #31374 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31374/consoleFull) for   PR 4419 at commit [`68c2318`](https://github.com/apache/spark/commit/68c2318f4bba22d86a895f1c9b76e2a431c3241b).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `trait LDAOptimizer `
      * `class EMLDAOptimizer extends LDAOptimizer `
      * `class OnlineLDAOptimizer extends LDAOptimizer `
    
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-94039816
  
    @hhbyyh Thank you very much for doing that!  That's really valuable to have exact validation of correctness.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-88872014
  
      [Test build #29607 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29607/consoleFull) for   PR 4419 at commit [`d19ef55`](https://github.com/apache/spark/commit/d19ef558433b7b062b40332ea38ac869fb7eb0d5).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29296402
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -208,3 +225,224 @@ class EMLDAOptimizer extends LDAOptimizer{
         new DistributedLDAModel(this, iterationTimes)
       }
     }
    +
    +
    +/**
    + * :: Experimental ::
    + *
    + * An online optimizer for LDA. The Optimizer implements the Online LDA algorithm, which
    + * processes a subset of the corpus by each call to next, and update the term-topic
    + * distribution adaptively.
    + *
    + * References:
    + *   Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010.
    + */
    +@Experimental
    +class OnlineLDAOptimizer extends LDAOptimizer {
    +
    +  // LDA common parameters
    +  private var k: Int = 0
    +  private var D: Int = 0
    +  private var vocabSize: Int = 0
    +  private var alpha: Double = 0
    +  private var eta: Double = 0
    +  private var randomSeed: Long = 0
    +
    +  // Online LDA specific parameters
    +  private var tau_0: Double = -1
    +  private var kappa: Double = -1
    +  private var batchSize: Int = -1
    +
    +  // internal data structure
    +  private var docs: RDD[(Long, Vector)] = null
    +  private var lambda: BDM[Double] = null
    +  private var Elogbeta: BDM[Double]= null
    +  private var expElogbeta: BDM[Double] = null
    +
    +  // count of invocation to next, used to help deciding the weight for each iteration
    +  private var iteration = 0
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   */
    +  def getTau_0: Double = {
    +    if (this.tau_0 == -1) {
    +      1024
    +    } else {
    +      this.tau_0
    +    }
    +  }
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   * Automatic setting of parameter:
    +   *  - default = 1024, which follows the recommendation from OnlineLDA paper.
    +   */
    +  def setTau_0(tau_0: Double): this.type = {
    +    require(tau_0 > 0 || tau_0 == -1.0,  s"LDA tau_0 must be positive, but was set to $tau_0")
    +    this.tau_0 = tau_0
    +    this
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate
    +   */
    +  def getKappa: Double = {
    +    if (this.kappa == -1) {
    +      0.5
    +    } else {
    +      this.kappa
    +    }
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate---should be between
    +   * (0.5, 1.0] to guarantee asymptotic convergence.
    +   *  - default = 0.5, which follows the recommendation from OnlineLDA paper.
    +   */
    +  def setKappa(kappa: Double): this.type = {
    +    require(kappa >= 0 || kappa == -1.0,
    +      s"Online LDA kappa must be nonnegative (or -1 for auto), but was set to $kappa")
    +    this.kappa = kappa
    +    this
    +  }
    +
    +  /**
    +   * Mini-batch size, which controls how many documents are used in each iteration
    +   */
    +  def getBatchSize: Int = {
    +    if (this.batchSize == -1) {
    +      D / 100
    +    } else {
    +      this.batchSize
    +    }
    +  }
    +
    +  /**
    +   * Mini-batch size, which controls how many documents are used in each iteration
    +   * default = 1% from total documents.
    +   */
    +  def setBatchSize(batchSize: Int): this.type = {
    +    this.batchSize = batchSize
    +    this
    +  }
    +
    +  private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = {
    +
    +    this.k = lda.getK
    +    this.D = docs.count().toInt
    +    this.vocabSize = docs.first()._2.size
    +    this.alpha = if (lda.getDocConcentration == -1) 1.0 / k else lda.getDocConcentration
    +    this.eta = if (lda.getTopicConcentration == -1) 1.0 / k else lda.getTopicConcentration
    +    this.randomSeed = randomSeed
    +
    +    this.docs = docs
    +
    +    // Initialize the variational distribution q(beta|lambda)
    +    this.lambda = getGammaMatrix(k, vocabSize)
    +    this.Elogbeta = dirichlet_expectation(lambda)
    +    this.expElogbeta = exp(Elogbeta)
    +    this.iteration = 0
    +    this
    +  }
    +
    +  /**
    +   * Submit a a subset (like 1%, decide by the batchSize) of the corpus to the Online LDA model,
    +   * and it will update the topic distribution adaptively for the terms appearing in the subset.
    +   *
    +   * @return  Inferred LDA model
    +   */
    +  private[clustering] override def next(): OnlineLDAOptimizer = {
    +    iteration += 1
    +    val batchSize = getBatchSize
    +    val batch = docs.sample(true, batchSize.toDouble / D, randomSeed).cache()
    +    if(batch.isEmpty()) return this
    +
    +    val k = this.k
    +    val vocabSize = this.vocabSize
    +    val expElogbeta = this.expElogbeta
    +    val alpha = this.alpha
    +
    +    val stats = batch.mapPartitions(docs =>{
    +      val stat = BDM.zeros[Double](k, vocabSize)
    +      docs.foreach(doc =>{
    +        val termCounts = doc._2
    +        val (ids, cts) = termCounts match {
    +          case v: DenseVector => (((0 until v.size).toList), v.values)
    +          case v: SparseVector => (v.indices.toList, v.values)
    +          case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
    +        }
    +
    +        // Initialize the variational distribution q(theta|gamma) for the mini-batch
    +        var gammad = new Gamma(100, 1.0 / 100.0).samplesVector(k).t // 1 * K
    +        var Elogthetad = digamma(gammad) - digamma(sum(gammad))     // 1 * K
    +        var expElogthetad = exp(Elogthetad)                         // 1 * K
    +        val expElogbetad = expElogbeta(::, ids).toDenseMatrix       // K * ids
    +
    +        var phinorm = expElogthetad * expElogbetad + 1e-100         // 1 * ids
    +        var meanchange = 1D
    +        val ctsVector = new BDV[Double](cts).t                      // 1 * ids
    +
    +        // Iterate between gamma and phi until convergence
    +        while (meanchange > 1e-5) {
    +          val lastgamma = gammad
    +          //        1*K                  1 * ids               ids * k
    +          gammad = (expElogthetad :* ((ctsVector / phinorm) * (expElogbetad.t))) + alpha
    +          Elogthetad = digamma(gammad) - digamma(sum(gammad))
    +          expElogthetad = exp(Elogthetad)
    +          phinorm = expElogthetad * expElogbetad + 1e-100
    +          meanchange = sum(abs(gammad - lastgamma)) / k
    +        }
    +
    +        val m1 = expElogthetad.t.toDenseMatrix.t
    +        val m2 = (ctsVector / phinorm).t.toDenseMatrix
    +        val outerResult = kron(m1, m2) // K * ids
    +        for (i <- 0 until ids.size) {
    +          stat(::, ids(i)) := (stat(::, ids(i)) + outerResult(::, i))
    +        }
    +        stat
    +      })
    +      Iterator(stat)
    +    })
    +
    +    val batchResult = stats.reduce(_ += _)
    +    update(batchResult, iteration, batchSize)
    +    batch.unpersist()
    +    this
    +  }
    +
    +  private[clustering] override def getLDAModel(iterationTimes: Array[Double]): LDAModel = {
    +    new LocalLDAModel(Matrices.fromBreeze(lambda).transpose)
    +  }
    +
    +  private def update(raw: BDM[Double], iter:Int, batchSize: Int): Unit ={
    +
    +    val tau_0 = this.getTau_0
    +    val kappa = this.getKappa
    +
    +    // weight of the mini-batch.
    +    val weight = math.pow(tau_0 + iter, -kappa)
    +
    +    // This step finishes computing the sufficient statistics for the M step
    +    val stat = raw :* expElogbeta
    +
    +    // Update lambda based on documents.
    +    lambda = lambda * (1 - weight) + (stat * D.toDouble / batchSize.toDouble + eta) * weight
    +    Elogbeta = dirichlet_expectation(lambda)
    +    expElogbeta = exp(Elogbeta)
    +  }
    +
    +  private def getGammaMatrix(row:Int, col:Int): BDM[Double] ={
    +    val gammaRandomGenerator = new Gamma(100, 1.0 / 100.0)
    +    val temp = gammaRandomGenerator.sample(row * col).toArray
    +    (new BDM[Double](col, row, temp)).t
    +  }
    +
    +  private def dirichlet_expectation(alpha : BDM[Double]): BDM[Double] = {
    --- End diff --
    
    Please add a little doc, even though it's an internal method
    
    Naming: please use camelCase instead of underscore


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-75618893
  
    @hhbyyh Thanks for the initial PR!  Here are some high-level comments:
    
    * RDD.sliding(): This may not take much advantage of parallelism.  It slides across the RDD by partitions first, meaning that only 1 (or a few) workers will be active on each iteration.  For the batch (RDD) setting, I wonder if it would be better to sample.  That would mean stochastic gradient descent, and it would hopefully be faster because of the expense of computing the gradient.  That would require some testing on an actual cluster to know for sure.
    
    * local vs. distributed models: The EM implementation supports very large vocabularies, where the topic distributions have to be distributed (the "term" vertices in the Graph).  It would be nice if the online LDA could support that too.  (I have heard of many use cases involving k and vocabSize large enough that the model would take many GB to store.)  However, I realize that storing the model (topics) locally is helpful for efficiency if the model is small enough.  Could you please sketch out how we might maintain a distributed model and the costs of doing that?
    
    * Returning DistributedLDAModel vs. LDAModel: It's true that online LDA should not return the current DistributedLDAModel since DistributedLDAModel has methods for returning info about the full training dataset.  That makes me wonder if we should have a different algorithm API for online LDA (OnlineLDA alongside LDA).  Does that sound reasonable?
    
    * code readability (though I know this is a WIP PR right now)
      * It will be helpful to have more comments and organization in the core optimization part of the code for reviewers to understand it.
      * Relatedly, it will be helpful to have the optimization steps (computing the gradient, computing the regularization, making the update, etc.) be separated out.  The optimization framework in MLlib is not suitable for you to use yet, probably, but hopefully it will be in the future (after this PR).  Separation of parts will help with those future changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-77060378
  
    As far as understanding the code, it's really more for the benefit of future developers than for me.  Sticking with the layout in Hoffman's code is fine with me, but I suspect we'll refactor to use general gradient-based optimization methods at some point in the future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97286892
  
    OK, I'll go ahead


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29296391
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -208,3 +225,224 @@ class EMLDAOptimizer extends LDAOptimizer{
         new DistributedLDAModel(this, iterationTimes)
       }
     }
    +
    +
    +/**
    + * :: Experimental ::
    + *
    + * An online optimizer for LDA. The Optimizer implements the Online LDA algorithm, which
    + * processes a subset of the corpus by each call to next, and update the term-topic
    + * distribution adaptively.
    + *
    + * References:
    + *   Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010.
    + */
    +@Experimental
    +class OnlineLDAOptimizer extends LDAOptimizer {
    +
    +  // LDA common parameters
    +  private var k: Int = 0
    +  private var D: Int = 0
    +  private var vocabSize: Int = 0
    +  private var alpha: Double = 0
    +  private var eta: Double = 0
    +  private var randomSeed: Long = 0
    +
    +  // Online LDA specific parameters
    +  private var tau_0: Double = -1
    +  private var kappa: Double = -1
    +  private var batchSize: Int = -1
    +
    +  // internal data structure
    +  private var docs: RDD[(Long, Vector)] = null
    +  private var lambda: BDM[Double] = null
    +  private var Elogbeta: BDM[Double]= null
    +  private var expElogbeta: BDM[Double] = null
    +
    +  // count of invocation to next, used to help deciding the weight for each iteration
    +  private var iteration = 0
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   */
    +  def getTau_0: Double = {
    +    if (this.tau_0 == -1) {
    +      1024
    +    } else {
    +      this.tau_0
    +    }
    +  }
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   * Automatic setting of parameter:
    +   *  - default = 1024, which follows the recommendation from OnlineLDA paper.
    +   */
    +  def setTau_0(tau_0: Double): this.type = {
    +    require(tau_0 > 0 || tau_0 == -1.0,  s"LDA tau_0 must be positive, but was set to $tau_0")
    +    this.tau_0 = tau_0
    +    this
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate
    +   */
    +  def getKappa: Double = {
    +    if (this.kappa == -1) {
    +      0.5
    +    } else {
    +      this.kappa
    +    }
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate---should be between
    +   * (0.5, 1.0] to guarantee asymptotic convergence.
    +   *  - default = 0.5, which follows the recommendation from OnlineLDA paper.
    +   */
    +  def setKappa(kappa: Double): this.type = {
    +    require(kappa >= 0 || kappa == -1.0,
    +      s"Online LDA kappa must be nonnegative (or -1 for auto), but was set to $kappa")
    +    this.kappa = kappa
    +    this
    +  }
    +
    +  /**
    +   * Mini-batch size, which controls how many documents are used in each iteration
    +   */
    +  def getBatchSize: Int = {
    +    if (this.batchSize == -1) {
    +      D / 100
    +    } else {
    +      this.batchSize
    +    }
    +  }
    +
    +  /**
    +   * Mini-batch size, which controls how many documents are used in each iteration
    +   * default = 1% from total documents.
    +   */
    +  def setBatchSize(batchSize: Int): this.type = {
    +    this.batchSize = batchSize
    +    this
    +  }
    +
    +  private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = {
    +
    +    this.k = lda.getK
    +    this.D = docs.count().toInt
    +    this.vocabSize = docs.first()._2.size
    +    this.alpha = if (lda.getDocConcentration == -1) 1.0 / k else lda.getDocConcentration
    +    this.eta = if (lda.getTopicConcentration == -1) 1.0 / k else lda.getTopicConcentration
    +    this.randomSeed = randomSeed
    +
    +    this.docs = docs
    +
    +    // Initialize the variational distribution q(beta|lambda)
    +    this.lambda = getGammaMatrix(k, vocabSize)
    +    this.Elogbeta = dirichlet_expectation(lambda)
    +    this.expElogbeta = exp(Elogbeta)
    +    this.iteration = 0
    +    this
    +  }
    +
    +  /**
    +   * Submit a a subset (like 1%, decide by the batchSize) of the corpus to the Online LDA model,
    --- End diff --
    
    typo: "a a"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97377127
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-86905181
  
      [Test build #29299 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29299/consoleFull) for   PR 4419 at commit [`97b9e1a`](https://github.com/apache/spark/commit/97b9e1abde2aaa57a692713d219b314000b51095).
     * This patch **passes all tests**.
    
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class OnlineLDAOptimizer (`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29473857
  
    --- Diff: mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java ---
    @@ -109,6 +109,37 @@ public void distributedLDAModel() {
         assert(model.logPrior() < 0.0);
       }
     
    +
    +  @Test
    +  public void OnlineOptimizerCompatibility() {
    +      int k = 3;
    +      double topicSmoothing = 1.2;
    +      double termSmoothing = 1.2;
    +
    +      // Train a model
    +      OnlineLDAOptimizer op = new OnlineLDAOptimizer().setTau_0(1024).setKappa(0.51)
    +              .setGammaShape(1e40).setMiniBatchFraction(0.5);
    +      LDA lda = new LDA();
    --- End diff --
    
    Java style (2 space indentation):
    ```
    LDA lda = new LDA()
      .setK(k)
      .setDocConcentration(topicSmoothing)
    ```
    etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-87141115
  
    @hhbyyh  Thanks for the updates.
    
    **Builder pattern**: I recommend having OnlineLDA be a class (not an object) and using a builder pattern to specify parameters:
    ```
      val lda = new OnlineLDA().setK(50)
      val model = lda.run(myDataset)
    ```
    
    **Sampling**: It can be useful to make multiple passes over the dataset.  Instead of taking ```batchNumber```, could this take ```numIterations``` and ```miniBatchFraction``` (as in GradientDescent)?
    
    **Optimizer**: I like that you separated out the optimizer, but I think we should keep it private for now.
    
    **Work remaining**: I hope we can get this ready before long, but there are some major items remaining.  Some are obvious, but I wanted to list them here:
    * Parameter parity: It will be good to support the same parameters which LDA currently supports.
    * API: The main API questions are:
      * Should OnlineLDA be just another optimizer in the current LDA class?  (I like this, though the ```miniBatchFraction``` parameter would only be used for online learning, not batch EM.)
      * If so, then how do we need to adjust the LDAModel API?
    * Testing:
      * Unit tests
      * Scaling tests on a cluster with a large dataset
      * Correctness: It would be great to verify correctness vs. another implementation.  This would require both implementations running without any sampling to make the result deterministic.
    * Coding style: We'll need to do some significant cleanups to fit the Spark coding style guidelines.
    
    I'll think more about the API issues and update you soon.  Let me know if you have opinions about this too.  Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97399917
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31274/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-93939245
  
    @jkbradley Provide some update on Correctness test. 
    I have tested current PR with https://github.com/Blei-Lab/onlineldavb and the result are identical. I've uploaded the result and code to https://github.com/hhbyyh/LDACrossValidation. 
    
    I made some change to get rid of randomness, like initialize matrix with fixed numbers from file and replace batch sample with even split.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97505081
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97637198
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-76880137
  
    how about randomSplit for batch split?
    
    And you may refer to the python version on http://www.cs.princeton.edu/~mdhoffma/  to better understand the code. I try to stick to the original paper and implementation to ensure correctness. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97124052
  
      [Test build #31141 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31141/consoleFull) for   PR 4419 at commit [`b1178cf`](https://github.com/apache/spark/commit/b1178cfaad979f26ce02648795af65ecf08685c9).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `trait LDAOptimizer `
      * `class EMLDAOptimizer extends LDAOptimizer `
      * `class OnlineLDAOptimizer extends LDAOptimizer `
    
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-73652802
  
      [Test build #27177 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27177/consoleFull) for   PR 4419 at commit [`3a06526`](https://github.com/apache/spark/commit/3a06526df629b8ff1291bfb1b183f5e6af45bcde).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-86905203
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29299/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97282588
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29296395
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -208,3 +225,224 @@ class EMLDAOptimizer extends LDAOptimizer{
         new DistributedLDAModel(this, iterationTimes)
       }
     }
    +
    +
    +/**
    + * :: Experimental ::
    + *
    + * An online optimizer for LDA. The Optimizer implements the Online LDA algorithm, which
    + * processes a subset of the corpus by each call to next, and update the term-topic
    + * distribution adaptively.
    + *
    + * References:
    + *   Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010.
    + */
    +@Experimental
    +class OnlineLDAOptimizer extends LDAOptimizer {
    +
    +  // LDA common parameters
    +  private var k: Int = 0
    +  private var D: Int = 0
    +  private var vocabSize: Int = 0
    +  private var alpha: Double = 0
    +  private var eta: Double = 0
    +  private var randomSeed: Long = 0
    +
    +  // Online LDA specific parameters
    +  private var tau_0: Double = -1
    +  private var kappa: Double = -1
    +  private var batchSize: Int = -1
    +
    +  // internal data structure
    +  private var docs: RDD[(Long, Vector)] = null
    +  private var lambda: BDM[Double] = null
    +  private var Elogbeta: BDM[Double]= null
    +  private var expElogbeta: BDM[Double] = null
    +
    +  // count of invocation to next, used to help deciding the weight for each iteration
    +  private var iteration = 0
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   */
    +  def getTau_0: Double = {
    +    if (this.tau_0 == -1) {
    +      1024
    +    } else {
    +      this.tau_0
    +    }
    +  }
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   * Automatic setting of parameter:
    +   *  - default = 1024, which follows the recommendation from OnlineLDA paper.
    +   */
    +  def setTau_0(tau_0: Double): this.type = {
    +    require(tau_0 > 0 || tau_0 == -1.0,  s"LDA tau_0 must be positive, but was set to $tau_0")
    +    this.tau_0 = tau_0
    +    this
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate
    +   */
    +  def getKappa: Double = {
    +    if (this.kappa == -1) {
    +      0.5
    +    } else {
    +      this.kappa
    +    }
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate---should be between
    +   * (0.5, 1.0] to guarantee asymptotic convergence.
    +   *  - default = 0.5, which follows the recommendation from OnlineLDA paper.
    +   */
    +  def setKappa(kappa: Double): this.type = {
    +    require(kappa >= 0 || kappa == -1.0,
    +      s"Online LDA kappa must be nonnegative (or -1 for auto), but was set to $kappa")
    +    this.kappa = kappa
    +    this
    +  }
    +
    +  /**
    +   * Mini-batch size, which controls how many documents are used in each iteration
    +   */
    +  def getBatchSize: Int = {
    +    if (this.batchSize == -1) {
    +      D / 100
    +    } else {
    +      this.batchSize
    +    }
    +  }
    +
    +  /**
    +   * Mini-batch size, which controls how many documents are used in each iteration
    +   * default = 1% from total documents.
    +   */
    +  def setBatchSize(batchSize: Int): this.type = {
    +    this.batchSize = batchSize
    +    this
    +  }
    +
    +  private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = {
    +
    +    this.k = lda.getK
    +    this.D = docs.count().toInt
    +    this.vocabSize = docs.first()._2.size
    +    this.alpha = if (lda.getDocConcentration == -1) 1.0 / k else lda.getDocConcentration
    +    this.eta = if (lda.getTopicConcentration == -1) 1.0 / k else lda.getTopicConcentration
    +    this.randomSeed = randomSeed
    +
    +    this.docs = docs
    +
    +    // Initialize the variational distribution q(beta|lambda)
    +    this.lambda = getGammaMatrix(k, vocabSize)
    +    this.Elogbeta = dirichlet_expectation(lambda)
    +    this.expElogbeta = exp(Elogbeta)
    +    this.iteration = 0
    +    this
    +  }
    +
    +  /**
    +   * Submit a a subset (like 1%, decide by the batchSize) of the corpus to the Online LDA model,
    +   * and it will update the topic distribution adaptively for the terms appearing in the subset.
    +   *
    +   * @return  Inferred LDA model
    +   */
    +  private[clustering] override def next(): OnlineLDAOptimizer = {
    +    iteration += 1
    +    val batchSize = getBatchSize
    +    val batch = docs.sample(true, batchSize.toDouble / D, randomSeed).cache()
    +    if(batch.isEmpty()) return this
    +
    +    val k = this.k
    +    val vocabSize = this.vocabSize
    +    val expElogbeta = this.expElogbeta
    +    val alpha = this.alpha
    +
    +    val stats = batch.mapPartitions(docs =>{
    +      val stat = BDM.zeros[Double](k, vocabSize)
    +      docs.foreach(doc =>{
    +        val termCounts = doc._2
    +        val (ids, cts) = termCounts match {
    +          case v: DenseVector => (((0 until v.size).toList), v.values)
    +          case v: SparseVector => (v.indices.toList, v.values)
    +          case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
    +        }
    +
    +        // Initialize the variational distribution q(theta|gamma) for the mini-batch
    +        var gammad = new Gamma(100, 1.0 / 100.0).samplesVector(k).t // 1 * K
    +        var Elogthetad = digamma(gammad) - digamma(sum(gammad))     // 1 * K
    +        var expElogthetad = exp(Elogthetad)                         // 1 * K
    +        val expElogbetad = expElogbeta(::, ids).toDenseMatrix       // K * ids
    +
    +        var phinorm = expElogthetad * expElogbetad + 1e-100         // 1 * ids
    +        var meanchange = 1D
    +        val ctsVector = new BDV[Double](cts).t                      // 1 * ids
    +
    +        // Iterate between gamma and phi until convergence
    +        while (meanchange > 1e-5) {
    +          val lastgamma = gammad
    +          //        1*K                  1 * ids               ids * k
    +          gammad = (expElogthetad :* ((ctsVector / phinorm) * (expElogbetad.t))) + alpha
    +          Elogthetad = digamma(gammad) - digamma(sum(gammad))
    +          expElogthetad = exp(Elogthetad)
    +          phinorm = expElogthetad * expElogbetad + 1e-100
    +          meanchange = sum(abs(gammad - lastgamma)) / k
    +        }
    +
    +        val m1 = expElogthetad.t.toDenseMatrix.t
    +        val m2 = (ctsVector / phinorm).t.toDenseMatrix
    +        val outerResult = kron(m1, m2) // K * ids
    +        for (i <- 0 until ids.size) {
    +          stat(::, ids(i)) := (stat(::, ids(i)) + outerResult(::, i))
    +        }
    +        stat
    +      })
    +      Iterator(stat)
    +    })
    +
    +    val batchResult = stats.reduce(_ += _)
    +    update(batchResult, iteration, batchSize)
    +    batch.unpersist()
    +    this
    +  }
    +
    +  private[clustering] override def getLDAModel(iterationTimes: Array[Double]): LDAModel = {
    +    new LocalLDAModel(Matrices.fromBreeze(lambda).transpose)
    +  }
    +
    +  private def update(raw: BDM[Double], iter:Int, batchSize: Int): Unit ={
    --- End diff --
    
    Please add a little doc, even though it's an internal method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-84961719
  
      [Test build #28996 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28996/consoleFull) for   PR 4419 at commit [`f367cc9`](https://github.com/apache/spark/commit/f367cc90bcb19773b618ed3ca47b3529d8ee370c).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97283442
  
    Hi @jkbradley ,
    
    I made some updates, and there're 2 things left, which I can submit later.
    1. the comment about running in batch Mode.
    2. the gamma random generator.
    
    Please go ahead if there're more things that should be changed, and I'll be working on UT.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97656728
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31374/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-73197610
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26901/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97294476
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31206/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-73644697
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27176/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-78413943
  
      [Test build #28494 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28494/consoleFull) for   PR 4419 at commit [`f6d47ca`](https://github.com/apache/spark/commit/f6d47ca3cddf0eae9ba5e8bfc5a5afdd5ed0d820).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-76700029
  
      [Test build #28168 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28168/consoleFull) for   PR 4419 at commit [`581c623`](https://github.com/apache/spark/commit/581c623106f38d91497fb8123f47c4e661057071).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97637962
  
    For anyone with interest, the code for running one iteration on Blei-lab's version is uploaded to:
    https://github.com/hhbyyh/onlineldavb/blob/oneIteration/onlineldavb.py.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-88995458
  
    @hhbyyh Thanks for the results!  It looks like it's doing something reasonable, though of course it's always hard to tell.  How hard would it be for you to compare your implementation with [https://github.com/Blei-Lab/onlineldavb] (or some other Online LDA VB code) in a deterministic way?  If both could be run for the same number of iterations and use the full dataset on each iteration, then they should produce identical results.  That would be a great verification if possible.
    
    Also, let us know on the linked PR above if you have thoughts about the API updates being made there.  I just posted my thoughts at the bottom of that PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-73180955
  
      [Test build #26895 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26895/consoleFull) for   PR 4419 at commit [`d640d9c`](https://github.com/apache/spark/commit/d640d9c58cd4f3caa6eac462b947b3a891dabbda).
     * This patch **does not merge cleanly**.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-89374500
  
    @hhbyyh I just posted a proposal on the other PR: [https://github.com/apache/spark/pull/4807]
    Does it sound workable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-78413946
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28494/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-88889456
  
      [Test build #29607 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/29607/consoleFull) for   PR 4419 at commit [`d19ef55`](https://github.com/apache/spark/commit/d19ef558433b7b062b40332ea38ac869fb7eb0d5).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class OnlineLDA(`
    
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29304515
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -208,3 +225,224 @@ class EMLDAOptimizer extends LDAOptimizer{
         new DistributedLDAModel(this, iterationTimes)
       }
     }
    +
    +
    +/**
    + * :: Experimental ::
    + *
    + * An online optimizer for LDA. The Optimizer implements the Online LDA algorithm, which
    + * processes a subset of the corpus by each call to next, and update the term-topic
    + * distribution adaptively.
    + *
    + * References:
    + *   Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010.
    + */
    +@Experimental
    +class OnlineLDAOptimizer extends LDAOptimizer {
    +
    +  // LDA common parameters
    +  private var k: Int = 0
    +  private var D: Int = 0
    +  private var vocabSize: Int = 0
    +  private var alpha: Double = 0
    +  private var eta: Double = 0
    +  private var randomSeed: Long = 0
    +
    +  // Online LDA specific parameters
    +  private var tau_0: Double = -1
    +  private var kappa: Double = -1
    +  private var batchSize: Int = -1
    +
    +  // internal data structure
    +  private var docs: RDD[(Long, Vector)] = null
    +  private var lambda: BDM[Double] = null
    +  private var Elogbeta: BDM[Double]= null
    +  private var expElogbeta: BDM[Double] = null
    +
    +  // count of invocation to next, used to help deciding the weight for each iteration
    +  private var iteration = 0
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   */
    +  def getTau_0: Double = {
    +    if (this.tau_0 == -1) {
    +      1024
    +    } else {
    +      this.tau_0
    +    }
    +  }
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   * Automatic setting of parameter:
    +   *  - default = 1024, which follows the recommendation from OnlineLDA paper.
    +   */
    +  def setTau_0(tau_0: Double): this.type = {
    +    require(tau_0 > 0 || tau_0 == -1.0,  s"LDA tau_0 must be positive, but was set to $tau_0")
    +    this.tau_0 = tau_0
    +    this
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate
    +   */
    +  def getKappa: Double = {
    +    if (this.kappa == -1) {
    +      0.5
    +    } else {
    +      this.kappa
    +    }
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate---should be between
    +   * (0.5, 1.0] to guarantee asymptotic convergence.
    +   *  - default = 0.5, which follows the recommendation from OnlineLDA paper.
    +   */
    +  def setKappa(kappa: Double): this.type = {
    +    require(kappa >= 0 || kappa == -1.0,
    +      s"Online LDA kappa must be nonnegative (or -1 for auto), but was set to $kappa")
    +    this.kappa = kappa
    +    this
    +  }
    +
    +  /**
    +   * Mini-batch size, which controls how many documents are used in each iteration
    +   */
    +  def getBatchSize: Int = {
    +    if (this.batchSize == -1) {
    +      D / 100
    +    } else {
    +      this.batchSize
    +    }
    +  }
    +
    +  /**
    +   * Mini-batch size, which controls how many documents are used in each iteration
    +   * default = 1% from total documents.
    +   */
    +  def setBatchSize(batchSize: Int): this.type = {
    +    this.batchSize = batchSize
    +    this
    +  }
    +
    +  private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = {
    +
    +    this.k = lda.getK
    +    this.D = docs.count().toInt
    +    this.vocabSize = docs.first()._2.size
    +    this.alpha = if (lda.getDocConcentration == -1) 1.0 / k else lda.getDocConcentration
    +    this.eta = if (lda.getTopicConcentration == -1) 1.0 / k else lda.getTopicConcentration
    +    this.randomSeed = randomSeed
    +
    +    this.docs = docs
    +
    +    // Initialize the variational distribution q(beta|lambda)
    +    this.lambda = getGammaMatrix(k, vocabSize)
    +    this.Elogbeta = dirichlet_expectation(lambda)
    +    this.expElogbeta = exp(Elogbeta)
    +    this.iteration = 0
    +    this
    +  }
    +
    +  /**
    +   * Submit a a subset (like 1%, decide by the batchSize) of the corpus to the Online LDA model,
    +   * and it will update the topic distribution adaptively for the terms appearing in the subset.
    +   *
    +   * @return  Inferred LDA model
    +   */
    +  private[clustering] override def next(): OnlineLDAOptimizer = {
    +    iteration += 1
    +    val batchSize = getBatchSize
    +    val batch = docs.sample(true, batchSize.toDouble / D, randomSeed).cache()
    +    if(batch.isEmpty()) return this
    +
    +    val k = this.k
    +    val vocabSize = this.vocabSize
    +    val expElogbeta = this.expElogbeta
    +    val alpha = this.alpha
    +
    +    val stats = batch.mapPartitions(docs =>{
    +      val stat = BDM.zeros[Double](k, vocabSize)
    +      docs.foreach(doc =>{
    +        val termCounts = doc._2
    +        val (ids, cts) = termCounts match {
    +          case v: DenseVector => (((0 until v.size).toList), v.values)
    +          case v: SparseVector => (v.indices.toList, v.values)
    +          case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
    +        }
    +
    +        // Initialize the variational distribution q(theta|gamma) for the mini-batch
    +        var gammad = new Gamma(100, 1.0 / 100.0).samplesVector(k).t // 1 * K
    +        var Elogthetad = digamma(gammad) - digamma(sum(gammad))     // 1 * K
    +        var expElogthetad = exp(Elogthetad)                         // 1 * K
    +        val expElogbetad = expElogbeta(::, ids).toDenseMatrix       // K * ids
    +
    +        var phinorm = expElogthetad * expElogbetad + 1e-100         // 1 * ids
    +        var meanchange = 1D
    +        val ctsVector = new BDV[Double](cts).t                      // 1 * ids
    +
    +        // Iterate between gamma and phi until convergence
    +        while (meanchange > 1e-5) {
    +          val lastgamma = gammad
    +          //        1*K                  1 * ids               ids * k
    +          gammad = (expElogthetad :* ((ctsVector / phinorm) * (expElogbetad.t))) + alpha
    +          Elogthetad = digamma(gammad) - digamma(sum(gammad))
    +          expElogthetad = exp(Elogthetad)
    +          phinorm = expElogthetad * expElogbetad + 1e-100
    +          meanchange = sum(abs(gammad - lastgamma)) / k
    +        }
    +
    +        val m1 = expElogthetad.t.toDenseMatrix.t
    +        val m2 = (ctsVector / phinorm).t.toDenseMatrix
    +        val outerResult = kron(m1, m2) // K * ids
    +        for (i <- 0 until ids.size) {
    +          stat(::, ids(i)) := (stat(::, ids(i)) + outerResult(::, i))
    +        }
    +        stat
    +      })
    +      Iterator(stat)
    +    })
    +
    +    val batchResult = stats.reduce(_ += _)
    +    update(batchResult, iteration, batchSize)
    +    batch.unpersist()
    +    this
    +  }
    +
    +  private[clustering] override def getLDAModel(iterationTimes: Array[Double]): LDAModel = {
    +    new LocalLDAModel(Matrices.fromBreeze(lambda).transpose)
    +  }
    +
    +  private def update(raw: BDM[Double], iter:Int, batchSize: Int): Unit ={
    +
    +    val tau_0 = this.getTau_0
    +    val kappa = this.getKappa
    +
    +    // weight of the mini-batch.
    +    val weight = math.pow(tau_0 + iter, -kappa)
    +
    +    // This step finishes computing the sufficient statistics for the M step
    +    val stat = raw :* expElogbeta
    +
    +    // Update lambda based on documents.
    +    lambda = lambda * (1 - weight) + (stat * D.toDouble / batchSize.toDouble + eta) * weight
    +    Elogbeta = dirichlet_expectation(lambda)
    +    expElogbeta = exp(Elogbeta)
    +  }
    +
    +  private def getGammaMatrix(row:Int, col:Int): BDM[Double] ={
    +    val gammaRandomGenerator = new Gamma(100, 1.0 / 100.0)
    --- End diff --
    
    I didn't see Gamma can take a random seed, or you mean we should replace the Gamma here? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-73185833
  
      [Test build #26899 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26899/consoleFull) for   PR 4419 at commit [`26dca1b`](https://github.com/apache/spark/commit/26dca1bddd98203e90e3cb36de4f3d16fbfbf6cc).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97274013
  
    @hhbyyh It will probably be better if we alternate to avoid conflicts, and mine should not take much time.  Could you please update the PR first?
    
    You don't need to give me write access.  I can send a PR against the branch you used to create this PR, and then you can review and merge it.  Merging it will automatically update this PR.
    
    The UT plan sounds good.  Just to be clear, we can't include a dataset, but a "small corpus" which can be stated in a few lines of code inside the unit test would work.  Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-76729536
  
      [Test build #28174 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28174/consoleFull) for   PR 4419 at commit [`e271eb1`](https://github.com/apache/spark/commit/e271eb1a0f6c329b05d3611abb3def1aeffc900e).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-87821996
  
    @hhbyyh 
    
    **API**
    
    I agree it will be important to allow users to use Online LDA in an online mini-batch setting, but I think the appropriate API for that is Spark Streaming.  I'd prefer to provide 2 public APIs: batch (as you have) and online (but using Streaming).  The method you wrote which takes a mini-batch can be kept private.
    
    If users have a model and want to update it with new data, we can provide an initialModel parameter they can use to warm-start training with their old model.
    
    **Builder Pattern and Parameter parity**
    
    Sounds good: You're right that we should figure out the general API first.
    
    **Testing**
    
    For correctness, a nice dataset is the 20 newsgroups dataset.  The Stanford NLP group provides a great tutorial on using it:
    [http://nlp.stanford.edu/wiki/Software/Classifier/20_Newsgroups]
    * I recommend we follow the steps in the first section ("Getting set up with the data") to pre-process it.
    
    For scaling, you could just duplicate 20 newsgroups, or you could use a bigger dataset.
    * Evan Sparks provided a Wikipedia dump here: [s3://files.sparks.requester.pays/enwiki_category_text/ ] ("All in all there are 181 ~50mb files (actually closer to 10GB).")
    * The ACL machine translation workshops provide some big datasets. E.g.: [http://www.statmt.org/wmt14/translation-task.html]



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97282638
  
      [Test build #31206 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31206/consoleFull) for   PR 4419 at commit [`a996a82`](https://github.com/apache/spark/commit/a996a82e530a9e3d1465bb8872feb0ab70e579a3).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29473555
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -202,9 +202,241 @@ class EMLDAOptimizer extends LDAOptimizer{
         graph.vertices.filter(isTermVertex).values.fold(BDV.zeros[Double](numTopics))(_ += _)
       }
     
    -  private[clustering] override def getLDAModel(iterationTimes: Array[Double]): LDAModel = {
    +  override private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel = {
         require(graph != null, "graph is null, EMLDAOptimizer not initialized.")
         this.graphCheckpointer.deleteAllCheckpoints()
         new DistributedLDAModel(this, iterationTimes)
       }
     }
    +
    +
    +/**
    + * :: Experimental ::
    + *
    + * An online optimizer for LDA. The Optimizer implements the Online variational Bayes LDA
    + * algorithm, which processes a subset of the corpus on each iteration, and updates the term-topic
    + * distribution adaptively.
    + *
    + * Original Online LDA paper:
    + *   Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010.
    + */
    +@Experimental
    +class OnlineLDAOptimizer extends LDAOptimizer {
    +
    +  // LDA common parameters
    +  private var k: Int = 0
    +  private var corpusSize: Long = 0
    +  private var vocabSize: Int = 0
    +  private[clustering] var alpha: Double = 0
    +  private[clustering] var eta: Double = 0
    +  private var randomGenerator: java.util.Random = null
    +
    +  // Online LDA specific parameters
    +  private var tau_0: Double = 1024
    +  private var kappa: Double = 0.51
    +  private var miniBatchFraction: Double = 0.01
    +
    +  // internal data structure
    +  private var docs: RDD[(Long, Vector)] = null
    +  private[clustering] var lambda: BDM[Double] = null
    +
    +  // count of invocation to next, which helps deciding the weight for each iteration
    +  private var iteration: Int = 0
    +  private var gammaShape: Double = 100
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations. Larger values make early
    +   * iterations count less.
    +   */
    +  def getTau_0: Double = this.tau_0
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations. Larger values make early
    +   * iterations count less.
    +   * Default: 1024, following the original Online LDA paper.
    +   */
    +  def setTau_0(tau_0: Double): this.type = {
    +    require(tau_0 > 0,  s"LDA tau_0 must be positive, but was set to $tau_0")
    +    this.tau_0 = tau_0
    +    this
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate
    +   */
    +  def getKappa: Double = this.kappa
    +
    +  /**
    +   * Learning rate: exponential decay rate---should be between
    +   * (0.5, 1.0] to guarantee asymptotic convergence.
    +   * Default: 0.51, based on the original Online LDA paper.
    +   */
    +  def setKappa(kappa: Double): this.type = {
    +    require(kappa >= 0, s"Online LDA kappa must be nonnegative, but was set to $kappa")
    +    this.kappa = kappa
    +    this
    +  }
    +
    +  /**
    +   * Mini-batch fraction, which sets the fraction of document sampled and used in each iteration
    +   */
    +  def getMiniBatchFraction: Double = this.miniBatchFraction
    +
    +  /**
    +   * Mini-batch fraction in (0, 1], which sets the fraction of document sampled and used in
    +   * each iteration.
    +   * Default: 0.01, i.e., 1% of total documents
    +   */
    +  def setMiniBatchFraction(miniBatchFraction: Double): this.type = {
    +    require(miniBatchFraction > 0.0 && miniBatchFraction <= 1.0,
    +      s"Online LDA miniBatchFraction must be in range (0,1], but was set to $miniBatchFraction")
    +    this.miniBatchFraction = miniBatchFraction
    +    this
    +  }
    +
    +  /**
    +   * The function is for test only now. In the future, it can help support training strop/resume
    +   */
    +  private[clustering] def setLambda(lambda: BDM[Double]): this.type = {
    +    this.lambda = lambda
    +    this
    +  }
    +
    +  /**
    +   * Used to control the gamma distribution. Larger value produces values closer to 1.0.
    +   */
    +  private[clustering] def setGammaShape(shape: Double): this.type = {
    +    this.gammaShape = shape
    +    this
    +  }
    +
    +  override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA):
    --- End diff --
    
    scala style: If this can't fit on 1 line (100 chars), then put 1 argument per line:
    ```
    override private[clustering] def initialize(
        docs: RDD[(Long, Vector)],
        lda: LDA): OnlineLDAOptimizer = {
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97637305
  
      [Test build #31374 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31374/consoleFull) for   PR 4419 at commit [`68c2318`](https://github.com/apache/spark/commit/68c2318f4bba22d86a895f1c9b76e2a431c3241b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-73644215
  
      [Test build #27176 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27176/consoleFull) for   PR 4419 at commit [`0d0f3ee`](https://github.com/apache/spark/commit/0d0f3eef6d4e2754bfa2904f30bf9e21005ae392).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97977337
  
    @hhbyyh  Thanks for the updates!  Apologies for the delayed review; I just got off a flight.  I just made a few tiny comments and will try to make a final pass later today or early tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29296374
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -208,3 +225,224 @@ class EMLDAOptimizer extends LDAOptimizer{
         new DistributedLDAModel(this, iterationTimes)
       }
     }
    +
    +
    +/**
    + * :: Experimental ::
    + *
    + * An online optimizer for LDA. The Optimizer implements the Online LDA algorithm, which
    + * processes a subset of the corpus by each call to next, and update the term-topic
    --- End diff --
    
    "by each call to next" --> "on each iteration"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97637188
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29473554
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -202,9 +202,241 @@ class EMLDAOptimizer extends LDAOptimizer{
         graph.vertices.filter(isTermVertex).values.fold(BDV.zeros[Double](numTopics))(_ += _)
       }
     
    -  private[clustering] override def getLDAModel(iterationTimes: Array[Double]): LDAModel = {
    +  override private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel = {
         require(graph != null, "graph is null, EMLDAOptimizer not initialized.")
         this.graphCheckpointer.deleteAllCheckpoints()
         new DistributedLDAModel(this, iterationTimes)
       }
     }
    +
    +
    +/**
    + * :: Experimental ::
    + *
    + * An online optimizer for LDA. The Optimizer implements the Online variational Bayes LDA
    + * algorithm, which processes a subset of the corpus on each iteration, and updates the term-topic
    + * distribution adaptively.
    + *
    + * Original Online LDA paper:
    + *   Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010.
    + */
    +@Experimental
    +class OnlineLDAOptimizer extends LDAOptimizer {
    +
    +  // LDA common parameters
    +  private var k: Int = 0
    +  private var corpusSize: Long = 0
    +  private var vocabSize: Int = 0
    +  private[clustering] var alpha: Double = 0
    +  private[clustering] var eta: Double = 0
    +  private var randomGenerator: java.util.Random = null
    +
    +  // Online LDA specific parameters
    +  private var tau_0: Double = 1024
    +  private var kappa: Double = 0.51
    +  private var miniBatchFraction: Double = 0.01
    +
    +  // internal data structure
    +  private var docs: RDD[(Long, Vector)] = null
    +  private[clustering] var lambda: BDM[Double] = null
    +
    +  // count of invocation to next, which helps deciding the weight for each iteration
    +  private var iteration: Int = 0
    +  private var gammaShape: Double = 100
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations. Larger values make early
    +   * iterations count less.
    +   */
    +  def getTau_0: Double = this.tau_0
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations. Larger values make early
    +   * iterations count less.
    +   * Default: 1024, following the original Online LDA paper.
    +   */
    +  def setTau_0(tau_0: Double): this.type = {
    +    require(tau_0 > 0,  s"LDA tau_0 must be positive, but was set to $tau_0")
    +    this.tau_0 = tau_0
    +    this
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate
    +   */
    +  def getKappa: Double = this.kappa
    +
    +  /**
    +   * Learning rate: exponential decay rate---should be between
    +   * (0.5, 1.0] to guarantee asymptotic convergence.
    +   * Default: 0.51, based on the original Online LDA paper.
    +   */
    +  def setKappa(kappa: Double): this.type = {
    +    require(kappa >= 0, s"Online LDA kappa must be nonnegative, but was set to $kappa")
    +    this.kappa = kappa
    +    this
    +  }
    +
    +  /**
    +   * Mini-batch fraction, which sets the fraction of document sampled and used in each iteration
    +   */
    +  def getMiniBatchFraction: Double = this.miniBatchFraction
    +
    +  /**
    +   * Mini-batch fraction in (0, 1], which sets the fraction of document sampled and used in
    +   * each iteration.
    +   * Default: 0.01, i.e., 1% of total documents
    +   */
    +  def setMiniBatchFraction(miniBatchFraction: Double): this.type = {
    +    require(miniBatchFraction > 0.0 && miniBatchFraction <= 1.0,
    +      s"Online LDA miniBatchFraction must be in range (0,1], but was set to $miniBatchFraction")
    +    this.miniBatchFraction = miniBatchFraction
    +    this
    +  }
    +
    +  /**
    +   * The function is for test only now. In the future, it can help support training strop/resume
    --- End diff --
    
    typo: "strop" -> "stop"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97377375
  
      [Test build #31274 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31274/consoleFull) for   PR 4419 at commit [`138bfed`](https://github.com/apache/spark/commit/138bfeda88d1a6407bb6425b95f371fe40057977).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29473890
  
    --- Diff: mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java ---
    @@ -109,6 +109,37 @@ public void distributedLDAModel() {
         assert(model.logPrior() < 0.0);
       }
     
    +
    +  @Test
    +  public void OnlineOptimizerCompatibility() {
    +      int k = 3;
    +      double topicSmoothing = 1.2;
    +      double termSmoothing = 1.2;
    +
    +      // Train a model
    +      OnlineLDAOptimizer op = new OnlineLDAOptimizer().setTau_0(1024).setKappa(0.51)
    --- End diff --
    
    java style: 1 setX call per line (as in the comment a few lines below)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97377137
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97294473
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29296380
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -208,3 +225,224 @@ class EMLDAOptimizer extends LDAOptimizer{
         new DistributedLDAModel(this, iterationTimes)
       }
     }
    +
    +
    +/**
    + * :: Experimental ::
    + *
    + * An online optimizer for LDA. The Optimizer implements the Online LDA algorithm, which
    + * processes a subset of the corpus by each call to next, and update the term-topic
    + * distribution adaptively.
    + *
    + * References:
    + *   Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010.
    + */
    +@Experimental
    +class OnlineLDAOptimizer extends LDAOptimizer {
    +
    +  // LDA common parameters
    +  private var k: Int = 0
    +  private var D: Int = 0
    +  private var vocabSize: Int = 0
    +  private var alpha: Double = 0
    +  private var eta: Double = 0
    +  private var randomSeed: Long = 0
    +
    +  // Online LDA specific parameters
    +  private var tau_0: Double = -1
    +  private var kappa: Double = -1
    +  private var batchSize: Int = -1
    +
    +  // internal data structure
    +  private var docs: RDD[(Long, Vector)] = null
    +  private var lambda: BDM[Double] = null
    +  private var Elogbeta: BDM[Double]= null
    +  private var expElogbeta: BDM[Double] = null
    +
    +  // count of invocation to next, used to help deciding the weight for each iteration
    +  private var iteration = 0
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   */
    +  def getTau_0: Double = {
    +    if (this.tau_0 == -1) {
    +      1024
    +    } else {
    +      this.tau_0
    +    }
    +  }
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    --- End diff --
    
    State that larger values make early iterations count less.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29296386
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -208,3 +225,224 @@ class EMLDAOptimizer extends LDAOptimizer{
         new DistributedLDAModel(this, iterationTimes)
       }
     }
    +
    +
    +/**
    + * :: Experimental ::
    + *
    + * An online optimizer for LDA. The Optimizer implements the Online LDA algorithm, which
    + * processes a subset of the corpus by each call to next, and update the term-topic
    + * distribution adaptively.
    + *
    + * References:
    + *   Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010.
    + */
    +@Experimental
    +class OnlineLDAOptimizer extends LDAOptimizer {
    +
    +  // LDA common parameters
    +  private var k: Int = 0
    +  private var D: Int = 0
    +  private var vocabSize: Int = 0
    +  private var alpha: Double = 0
    +  private var eta: Double = 0
    +  private var randomSeed: Long = 0
    +
    +  // Online LDA specific parameters
    +  private var tau_0: Double = -1
    +  private var kappa: Double = -1
    +  private var batchSize: Int = -1
    +
    +  // internal data structure
    +  private var docs: RDD[(Long, Vector)] = null
    +  private var lambda: BDM[Double] = null
    +  private var Elogbeta: BDM[Double]= null
    +  private var expElogbeta: BDM[Double] = null
    +
    +  // count of invocation to next, used to help deciding the weight for each iteration
    +  private var iteration = 0
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   */
    +  def getTau_0: Double = {
    +    if (this.tau_0 == -1) {
    +      1024
    +    } else {
    +      this.tau_0
    +    }
    +  }
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   * Automatic setting of parameter:
    +   *  - default = 1024, which follows the recommendation from OnlineLDA paper.
    +   */
    +  def setTau_0(tau_0: Double): this.type = {
    +    require(tau_0 > 0 || tau_0 == -1.0,  s"LDA tau_0 must be positive, but was set to $tau_0")
    +    this.tau_0 = tau_0
    +    this
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate
    +   */
    +  def getKappa: Double = {
    +    if (this.kappa == -1) {
    +      0.5
    +    } else {
    +      this.kappa
    +    }
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate---should be between
    +   * (0.5, 1.0] to guarantee asymptotic convergence.
    +   *  - default = 0.5, which follows the recommendation from OnlineLDA paper.
    +   */
    +  def setKappa(kappa: Double): this.type = {
    +    require(kappa >= 0 || kappa == -1.0,
    +      s"Online LDA kappa must be nonnegative (or -1 for auto), but was set to $kappa")
    +    this.kappa = kappa
    +    this
    +  }
    +
    +  /**
    +   * Mini-batch size, which controls how many documents are used in each iteration
    +   */
    +  def getBatchSize: Int = {
    --- End diff --
    
    Let's use "miniBatchFraction" to match GradientDescent.  Also, that way, the parameter default value can be problem-independent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29473855
  
    --- Diff: mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java ---
    @@ -109,6 +109,37 @@ public void distributedLDAModel() {
         assert(model.logPrior() < 0.0);
       }
     
    +
    +  @Test
    +  public void OnlineOptimizerCompatibility() {
    +      int k = 3;
    +      double topicSmoothing = 1.2;
    +      double termSmoothing = 1.2;
    +
    +      // Train a model
    +      OnlineLDAOptimizer op = new OnlineLDAOptimizer().setTau_0(1024).setKappa(0.51)
    +              .setGammaShape(1e40).setMiniBatchFraction(0.5);
    --- End diff --
    
    java style: 2 space indentation (everywhere)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-78420570
  
      [Test build #28495 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28495/consoleFull) for   PR 4419 at commit [`02d0373`](https://github.com/apache/spark/commit/02d037387f32adcddd98858176813f3a66991a38).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97077121
  
      [Test build #31141 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31141/consoleFull) for   PR 4419 at commit [`b1178cf`](https://github.com/apache/spark/commit/b1178cfaad979f26ce02648795af65ecf08685c9).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97535780
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31296/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-78413863
  
      [Test build #28494 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28494/consoleFull) for   PR 4419 at commit [`f6d47ca`](https://github.com/apache/spark/commit/f6d47ca3cddf0eae9ba5e8bfc5a5afdd5ed0d820).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-78415124
  
    Sample can actually help the performance. Thanks. 
    
    About the general gradient-based optimization. Totally agree. I'll do it as soon as I finished some urgent task in my hand. Meanwhile, it will be great if I can collect some user feedback to help improve performance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-73647385
  
      [Test build #27177 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27177/consoleFull) for   PR 4419 at commit [`3a06526`](https://github.com/apache/spark/commit/3a06526df629b8ff1291bfb1b183f5e6af45bcde).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29303059
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -208,3 +225,224 @@ class EMLDAOptimizer extends LDAOptimizer{
         new DistributedLDAModel(this, iterationTimes)
       }
     }
    +
    +
    +/**
    + * :: Experimental ::
    + *
    + * An online optimizer for LDA. The Optimizer implements the Online LDA algorithm, which
    + * processes a subset of the corpus by each call to next, and update the term-topic
    + * distribution adaptively.
    + *
    + * References:
    + *   Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010.
    + */
    +@Experimental
    +class OnlineLDAOptimizer extends LDAOptimizer {
    +
    +  // LDA common parameters
    +  private var k: Int = 0
    +  private var D: Int = 0
    +  private var vocabSize: Int = 0
    +  private var alpha: Double = 0
    +  private var eta: Double = 0
    +  private var randomSeed: Long = 0
    +
    +  // Online LDA specific parameters
    +  private var tau_0: Double = -1
    +  private var kappa: Double = -1
    +  private var batchSize: Int = -1
    +
    +  // internal data structure
    +  private var docs: RDD[(Long, Vector)] = null
    +  private var lambda: BDM[Double] = null
    +  private var Elogbeta: BDM[Double]= null
    +  private var expElogbeta: BDM[Double] = null
    +
    +  // count of invocation to next, used to help deciding the weight for each iteration
    +  private var iteration = 0
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   */
    +  def getTau_0: Double = {
    +    if (this.tau_0 == -1) {
    +      1024
    +    } else {
    +      this.tau_0
    +    }
    +  }
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   * Automatic setting of parameter:
    +   *  - default = 1024, which follows the recommendation from OnlineLDA paper.
    +   */
    +  def setTau_0(tau_0: Double): this.type = {
    +    require(tau_0 > 0 || tau_0 == -1.0,  s"LDA tau_0 must be positive, but was set to $tau_0")
    +    this.tau_0 = tau_0
    +    this
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate
    +   */
    +  def getKappa: Double = {
    +    if (this.kappa == -1) {
    +      0.5
    +    } else {
    +      this.kappa
    +    }
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate---should be between
    +   * (0.5, 1.0] to guarantee asymptotic convergence.
    +   *  - default = 0.5, which follows the recommendation from OnlineLDA paper.
    --- End diff --
    
    I'm OK to change the default value to 0.51. And for set kappa != 0, there's a comment in Blei's pythond code,
    "Note that if you pass the same set of D documents in every time and
    set kappa=0 this class can also be used to do batch VB." 
    I think we can keep the possiblity. Shall I add the comment ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-94041947
  
    Responding to comments in [https://github.com/apache/spark/pull/4807]:
    
    {quote}
    Another question is about existing parameters in LDA:
    Except K, all other parameters (Alpha, Beta, Maxiteration, seed, checkPointInterval) are useless or have different default values for Online LDA. I'm not sure if we should move all those parameters to EM optimizer.
    {quote}
    --> I disagree.  OnlineLDA could take most of these parameters, with caveats:
    * alpha, beta: These are hyperparameters of LDA.  EM does not estimate these, but it could be modified to estimate them.  The Online LDA algorithm you are following estimates these.  I'd recommend:
      * LDA takes these parameters as fixed values.
      * Online LDA takes a special parameter ```estimateAlphaBeta: Boolean``` which indicates whether or not it should estimate these hyperparameters.  In the implementation, it should be easy to update or not update these values.
    * maxIteration
      * As I suggested before, I'd recommend that OnlineLDA take ```numIterations``` and ```miniBatchFraction``` instead of ```batchNumber``` (to mimic GradientDescent).  ```numIterations``` will be shared by all LDA algorithms, but ```miniBatchFraction``` will be specific to OnlineLDA.
    * seed: OnlineLDA uses randomness in sampling and should use a random seed.
    
    I agree that ```checkpointInterval``` is not applicable to Online LDA.
    
    {quote}
    Actually I find LDA and OnlineLDA share quite few things and it's kind of difficult to merge them together. Maybe for OnlineLDA, separating it to another File is a better choice . (Later I'll provide an interface / example for stream).
    {quote}
    I agree that having the interface and the different algorithms in separate files is probably best.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-76746335
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28174/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29296360
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -98,13 +94,38 @@ class EMLDAOptimizer extends LDAOptimizer{
       /**
        * Compute bipartite term/doc graph.
        */
    -  private[clustering] override def initialState(
    -      docs: RDD[(Long, Vector)],
    -      k: Int,
    -      docConcentration: Double,
    -      topicConcentration: Double,
    -      randomSeed: Long,
    -      checkpointInterval: Int): LDAOptimizer = {
    +  private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA):
    +  LDAOptimizer = {
    +
    +    val docConcentration = lda.getDocConcentration
    +    val topicConcentration = lda.getTopicConcentration
    +    val k = lda.getK
    +
    +    /**
    +     * Note: The restriction > 1.0 may be relaxed in the future (allowing sparse solutions),
    +     *       but values in (0,1) are not yet supported.
    +     */
    +    require(docConcentration > 1.0 || docConcentration == -1.0, s"LDA docConcentration must be" +
    +      s" > 1.0 (or -1 for auto) for EM Optimizer, but was set to $docConcentration")
    +    require(topicConcentration > 1.0 || topicConcentration == -1.0, s"LDA topicConcentration " +
    +      s"must be > 1.0 (or -1 for auto) for EM Optimizer, but was set to $topicConcentration")
    +
    +    /**
    +     *  - For EM: default = (50 / k) + 1.
    --- End diff --
    
    No need to duplicate this comment since it's already in the Scala doc above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97505101
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-76700153
  
      [Test build #28168 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28168/consoleFull) for   PR 4419 at commit [`581c623`](https://github.com/apache/spark/commit/581c623106f38d91497fb8123f47c4e661057071).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-76700169
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28168/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-87619172
  
    Thanks for the informative feedback. And I sincerely like it when you tell me what's recommended and what should be changed. 
    
    ##### 1. First thing is API. 
    
    One thing great about Online LDA is that it can avoid loading the entire corpus, since it only need to process one mini batch each time. Thus I kinda feel it's necessary to have an API that can support the usage.
    In current edition, user can write some code like
    ```
       // corpus does not need to be ready before this
        val onlineLDA = new OnlineLDAOptimizer(k, D, vocabSize)
        for(i <- 1 to batchNumber){
          val batch =  // ... convert dynamically or read libsvm directly
          onlineLDA.submitMiniBatch(batch)
        }
    ```
    I think this will be especially necessary and helpful for larger data set since doc2vec at large scale is resource intensive. And having a stream of mini `documents: RDD[(Long, Vector)]` rather than an integrated corpus will be a key factor that why OnlineLDA can handle larger dataset and be stream friendly.
    This is why I leave optimizer public. I'd like to know your opinions.
    
    ##### 2. Builder Pattern and Parameter parity
    
        Sure it's doable. Originally I named `OnlineLDAOptimizer` just as `OnlineLDA`, and then I thought we talked about optimizer framework, so I changed it. If we can lock down API, it will be pretty clear how to proceed with these details.
    
    ##### 3. About Scaling and correctness testing, can you please share a recommended dataset?
    
    Thanks a lot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-76746314
  
      [Test build #28174 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28174/consoleFull) for   PR 4419 at commit [`e271eb1`](https://github.com/apache/spark/commit/e271eb1a0f6c329b05d3611abb3def1aeffc900e).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29296397
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -208,3 +225,224 @@ class EMLDAOptimizer extends LDAOptimizer{
         new DistributedLDAModel(this, iterationTimes)
       }
     }
    +
    +
    +/**
    + * :: Experimental ::
    + *
    + * An online optimizer for LDA. The Optimizer implements the Online LDA algorithm, which
    + * processes a subset of the corpus by each call to next, and update the term-topic
    + * distribution adaptively.
    + *
    + * References:
    + *   Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010.
    + */
    +@Experimental
    +class OnlineLDAOptimizer extends LDAOptimizer {
    +
    +  // LDA common parameters
    +  private var k: Int = 0
    +  private var D: Int = 0
    +  private var vocabSize: Int = 0
    +  private var alpha: Double = 0
    +  private var eta: Double = 0
    +  private var randomSeed: Long = 0
    +
    +  // Online LDA specific parameters
    +  private var tau_0: Double = -1
    +  private var kappa: Double = -1
    +  private var batchSize: Int = -1
    +
    +  // internal data structure
    +  private var docs: RDD[(Long, Vector)] = null
    +  private var lambda: BDM[Double] = null
    +  private var Elogbeta: BDM[Double]= null
    +  private var expElogbeta: BDM[Double] = null
    +
    +  // count of invocation to next, used to help deciding the weight for each iteration
    +  private var iteration = 0
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   */
    +  def getTau_0: Double = {
    +    if (this.tau_0 == -1) {
    +      1024
    +    } else {
    +      this.tau_0
    +    }
    +  }
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   * Automatic setting of parameter:
    +   *  - default = 1024, which follows the recommendation from OnlineLDA paper.
    +   */
    +  def setTau_0(tau_0: Double): this.type = {
    +    require(tau_0 > 0 || tau_0 == -1.0,  s"LDA tau_0 must be positive, but was set to $tau_0")
    +    this.tau_0 = tau_0
    +    this
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate
    +   */
    +  def getKappa: Double = {
    +    if (this.kappa == -1) {
    +      0.5
    +    } else {
    +      this.kappa
    +    }
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate---should be between
    +   * (0.5, 1.0] to guarantee asymptotic convergence.
    +   *  - default = 0.5, which follows the recommendation from OnlineLDA paper.
    +   */
    +  def setKappa(kappa: Double): this.type = {
    +    require(kappa >= 0 || kappa == -1.0,
    +      s"Online LDA kappa must be nonnegative (or -1 for auto), but was set to $kappa")
    +    this.kappa = kappa
    +    this
    +  }
    +
    +  /**
    +   * Mini-batch size, which controls how many documents are used in each iteration
    +   */
    +  def getBatchSize: Int = {
    +    if (this.batchSize == -1) {
    +      D / 100
    +    } else {
    +      this.batchSize
    +    }
    +  }
    +
    +  /**
    +   * Mini-batch size, which controls how many documents are used in each iteration
    +   * default = 1% from total documents.
    +   */
    +  def setBatchSize(batchSize: Int): this.type = {
    +    this.batchSize = batchSize
    +    this
    +  }
    +
    +  private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = {
    +
    +    this.k = lda.getK
    +    this.D = docs.count().toInt
    +    this.vocabSize = docs.first()._2.size
    +    this.alpha = if (lda.getDocConcentration == -1) 1.0 / k else lda.getDocConcentration
    +    this.eta = if (lda.getTopicConcentration == -1) 1.0 / k else lda.getTopicConcentration
    +    this.randomSeed = randomSeed
    +
    +    this.docs = docs
    +
    +    // Initialize the variational distribution q(beta|lambda)
    +    this.lambda = getGammaMatrix(k, vocabSize)
    +    this.Elogbeta = dirichlet_expectation(lambda)
    +    this.expElogbeta = exp(Elogbeta)
    +    this.iteration = 0
    +    this
    +  }
    +
    +  /**
    +   * Submit a a subset (like 1%, decide by the batchSize) of the corpus to the Online LDA model,
    +   * and it will update the topic distribution adaptively for the terms appearing in the subset.
    +   *
    +   * @return  Inferred LDA model
    +   */
    +  private[clustering] override def next(): OnlineLDAOptimizer = {
    +    iteration += 1
    +    val batchSize = getBatchSize
    +    val batch = docs.sample(true, batchSize.toDouble / D, randomSeed).cache()
    +    if(batch.isEmpty()) return this
    +
    +    val k = this.k
    +    val vocabSize = this.vocabSize
    +    val expElogbeta = this.expElogbeta
    +    val alpha = this.alpha
    +
    +    val stats = batch.mapPartitions(docs =>{
    +      val stat = BDM.zeros[Double](k, vocabSize)
    +      docs.foreach(doc =>{
    +        val termCounts = doc._2
    +        val (ids, cts) = termCounts match {
    +          case v: DenseVector => (((0 until v.size).toList), v.values)
    +          case v: SparseVector => (v.indices.toList, v.values)
    +          case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
    +        }
    +
    +        // Initialize the variational distribution q(theta|gamma) for the mini-batch
    +        var gammad = new Gamma(100, 1.0 / 100.0).samplesVector(k).t // 1 * K
    +        var Elogthetad = digamma(gammad) - digamma(sum(gammad))     // 1 * K
    +        var expElogthetad = exp(Elogthetad)                         // 1 * K
    +        val expElogbetad = expElogbeta(::, ids).toDenseMatrix       // K * ids
    +
    +        var phinorm = expElogthetad * expElogbetad + 1e-100         // 1 * ids
    +        var meanchange = 1D
    +        val ctsVector = new BDV[Double](cts).t                      // 1 * ids
    +
    +        // Iterate between gamma and phi until convergence
    +        while (meanchange > 1e-5) {
    +          val lastgamma = gammad
    +          //        1*K                  1 * ids               ids * k
    +          gammad = (expElogthetad :* ((ctsVector / phinorm) * (expElogbetad.t))) + alpha
    +          Elogthetad = digamma(gammad) - digamma(sum(gammad))
    +          expElogthetad = exp(Elogthetad)
    +          phinorm = expElogthetad * expElogbetad + 1e-100
    +          meanchange = sum(abs(gammad - lastgamma)) / k
    +        }
    +
    +        val m1 = expElogthetad.t.toDenseMatrix.t
    +        val m2 = (ctsVector / phinorm).t.toDenseMatrix
    +        val outerResult = kron(m1, m2) // K * ids
    +        for (i <- 0 until ids.size) {
    +          stat(::, ids(i)) := (stat(::, ids(i)) + outerResult(::, i))
    +        }
    +        stat
    +      })
    +      Iterator(stat)
    +    })
    +
    +    val batchResult = stats.reduce(_ += _)
    +    update(batchResult, iteration, batchSize)
    +    batch.unpersist()
    +    this
    +  }
    +
    +  private[clustering] override def getLDAModel(iterationTimes: Array[Double]): LDAModel = {
    +    new LocalLDAModel(Matrices.fromBreeze(lambda).transpose)
    +  }
    +
    +  private def update(raw: BDM[Double], iter:Int, batchSize: Int): Unit ={
    +
    +    val tau_0 = this.getTau_0
    +    val kappa = this.getKappa
    +
    +    // weight of the mini-batch.
    +    val weight = math.pow(tau_0 + iter, -kappa)
    +
    +    // This step finishes computing the sufficient statistics for the M step
    +    val stat = raw :* expElogbeta
    +
    +    // Update lambda based on documents.
    +    lambda = lambda * (1 - weight) + (stat * D.toDouble / batchSize.toDouble + eta) * weight
    +    Elogbeta = dirichlet_expectation(lambda)
    +    expElogbeta = exp(Elogbeta)
    +  }
    +
    +  private def getGammaMatrix(row:Int, col:Int): BDM[Double] ={
    --- End diff --
    
    Please add a little doc, even though it's an internal method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-73185880
  
      [Test build #26899 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26899/consoleFull) for   PR 4419 at commit [`26dca1b`](https://github.com/apache/spark/commit/26dca1bddd98203e90e3cb36de4f3d16fbfbf6cc).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-84990737
  
      [Test build #28996 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28996/consoleFull) for   PR 4419 at commit [`f367cc9`](https://github.com/apache/spark/commit/f367cc90bcb19773b618ed3ca47b3529d8ee370c).
     * This patch **passes all tests**.
    
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-73181007
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26895/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-88872665
  
    I tried it with 20_newsgroups on my dev PC with master=local
         lda.setK(20).setMiniBatchFraction(0.02).setNumIterations(50).setTau_0(1)
    
    Corpus summary:
    	 Training set size: 11314 documents
    	 Vocabulary size: 75720 terms
    	 Training set size: 1319189 tokens
    	 Preprocessing time: 5.269933486 sec
    
    15/04/02 16:47:35 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
    15/04/02 16:47:35 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
    Finished training LDA model.  Summary:
    	 Training time: 76.832266468 sec
    20 topics:
    TOPIC 0
    turkish	0.012058716788185058
    armenians	0.011981509726531264
    armenian	0.010004235150011833
    armenia	0.006114108792354288
    world	0.005581069826432254
    women	0.005329639527608472
    war	0.005310177119620838
    turks	0.005239327705142824
    turkey	0.00515853811651157
    jason	0.005097117216929361
    
    TOPIC 1
    key	0.01316648107372953
    chip	0.010541809566170054
    government	0.010153778875200192
    encryption	0.009326779907233067
    clipper	0.008439466049444127
    public	0.007530569438593113
    server	0.007261400075345819
    system	0.006687091520336047
    technology	0.006480600341755369
    information	0.006316195503698238
    
    TOPIC 2
    jews	0.009702004869705672
    book	0.007853214980848604
    law	0.0072516029853666045
    jewish	0.006976932023763277
    day	0.005739847653103481
    children	0.005001342704817967
    writes	0.00493066449176571
    father	0.0046775540842062535
    subject	0.004526731671596464
    word	0.004261478063021758
    
    TOPIC 3
    lines	0.0119093867472742
    window	0.011427052669067456
    subject	0.011425847994190297
    bike	0.011017175200925428
    organization	0.010201230717622143
    dod	0.010043030759882773
    number	0.007815779704716674
    writes	0.007217862379522699
    comments	0.007134599945200453
    jeff	0.006439084192687904
    
    TOPIC 4
    lines	0.012697140646928512
    subject	0.012137529322560533
    health	0.011682611293142414
    organization	0.011528032382047063
    child	0.009628843075919892
    writes	0.0094157758574453
    tax	0.009043295916954986
    scott	0.008076554420454728
    cramer	0.007101397643094552
    uunet	0.00675596330842285
    
    TOPIC 5
    writes	0.00775952426885881
    subject	0.006813048810540294
    lines	0.006104476761875572
    organization	0.005870115294116984
    time	0.005372893507922081
    make	0.005372502622392409
    things	0.005244802597423591
    space	0.004842330516924673
    work	0.0041157380189356845
    find	0.004003129079883294
    
    TOPIC 6
    god	0.03751706271018263
    jesus	0.01809338668630806
    bible	0.01273876849278002
    christian	0.011515937636034564
    truth	0.010207481599140669
    christians	0.009734060715720241
    true	0.009484256732270106
    faith	0.009472189499476388
    christ	0.008925357041998224
    subject	0.008794316465052065
    
    TOPIC 7
    israel	0.012796192036701572
    year	0.010460517189457452
    israeli	0.009566052338340451
    writes	0.00768129607686239
    young	0.006786117523012472
    subject	0.006212594045699227
    player	0.006201122596017433
    years	0.005983312056044822
    lines	0.005965734811237885
    organization	0.00588173227558923
    
    TOPIC 8
    organization	0.009066382601536617
    writes	0.009034865041789575
    subject	0.008994865803746694
    lines	0.008685308635338337
    good	0.007260135125978055
    time	0.006539877322034039
    kill	0.005511957186284815
    long	0.005373592823169537
    day	0.005142388760559978
    made	0.0048908894669731995
    
    TOPIC 9
    file	0.02061426397921947
    subject	0.01848451880060305
    lines	0.018451157015520685
    organization	0.017565345699419276
    files	0.013872841520549063
    university	0.011910930778210866
    version	0.011640873128443674
    mac	0.010282403737157388
    program	0.010009092176160198
    software	0.009597634369517759
    
    TOPIC 10
    writes	0.015805209959701905
    organization	0.014876675058153706
    subject	0.01470540557262385
    lines	0.01344357702892899
    bill	0.0095130114583992
    texas	0.007206431218924633
    michael	0.007178008874528052
    moon	0.007036346761009455
    university	0.00653090859458465
    msg	0.006096418108324479
    
    TOPIC 11
    game	0.014674661401040233
    team	0.013191146246910222
    play	0.01010707995403076
    hockey	0.00763938300173508
    games	0.007147533054310146
    league	0.006995082062115003
    win	0.006886276134145244
    nhl	0.006352039295683316
    season	0.006324133614914862
    year	0.006066287307435158
    
    TOPIC 12
    media	0.009886769239547982
    number	0.007791276679198658
    applications	0.007464461853157522
    printer	0.0069284277659139625
    slow	0.0063737660373687915
    final	0.0063017559062341685
    display	0.006202170194951673
    font	0.005918182002639881
    weapon	0.005859511598321789
    line	0.005673207694267071
    
    TOPIC 13
    lines	0.019038844609056707
    subject	0.01867889041065237
    organization	0.01863154404976698
    university	0.01052806373066347
    modem	0.007296262423776374
    insurance	0.006706286366160585
    ohio	0.0055479057363888645
    shipping	0.005144207558982205
    keywords	0.004719857675933903
    valid	0.004592917999173226
    
    TOPIC 14
    lines	0.027743859508015972
    subject	0.027223348067573386
    organization	0.02595495630938305
    windows	0.024189290961090792
    drive	0.017022504545593013
    university	0.015605362805097134
    card	0.014533215563853688
    problem	0.011159934238387473
    disk	0.010699944070108252
    dos	0.009886396242470662
    
    TOPIC 15
    max	0.08874578618327673
    air	0.01135975971362627
    water	0.010048845884928805
    ground	0.009358364099568165
    wiring	0.008658287879779946
    bhj	0.007680045983477506
    hot	0.006288355248206904
    giz	0.006246027066244181
    grant	0.00524119552915761
    bxn	0.004902311521825763
    
    TOPIC 16
    subject	0.013968368891644113
    organization	0.013592128327318657
    lines	0.013135179142445825
    university	0.006997169480004322
    drivers	0.006895162869223282
    computer	0.006763309415528489
    list	0.0066359312429406445
    internet	0.006075399371871774
    mail	0.006014434970537192
    writes	0.005602600081269788
    
    TOPIC 17
    keyboard	0.01223302361505956
    window	0.00898082252199221
    motif	0.008967087211735314
    clock	0.007854529186790353
    subject	0.0076639431371468775
    lines	0.007204139008145851
    page	0.006974667674085922
    library	0.0061012994381090205
    organization	0.006094251981423491
    application	0.00589426046208701
    
    TOPIC 18
    players	0.010997334837545092
    subject	0.010768729154177278
    lines	0.01076239370321377
    organization	0.010695482778729407
    speed	0.010097718273861734
    writes	0.009870602880910849
    wire	0.009020100414062154
    car	0.008602098673758638
    cars	0.007082965832769178
    good	0.0069138751648267485
    
    TOPIC 19
    gun	0.01041215866197905
    police	0.007151858743131414
    weapons	0.006701168281112015
    cover	0.006445776318889374
    completely	0.006396634763223227
    states	0.006286743675459915
    law	0.006009817769080257
    government	0.005870735254987818
    rate	0.005717125865256983
    crime	0.005551858992892133
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97535762
  
      [Test build #31296 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31296/consoleFull) for   PR 4419 at commit [`4041723`](https://github.com/apache/spark/commit/4041723d4fe0dcfab845c9d2a2c72be2ed87895e).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `trait LDAOptimizer `
      * `class EMLDAOptimizer extends LDAOptimizer `
      * `class OnlineLDAOptimizer extends LDAOptimizer `
    
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97247660
  
    **Parameter names for online LDA**:
    It would be nice to have more interpretable names for the parameters
    
    **maxIterations**:
    Before, we had discussed having 1 "iteration" mean one pass over the full corpus.  It looks like you kept the meaning as one pass over a subsample.  That's OK with me since it matches the meaning used by GradientDescent.  I just want to confirm.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97505423
  
      [Test build #31296 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31296/consoleFull) for   PR 4419 at commit [`4041723`](https://github.com/apache/spark/commit/4041723d4fe0dcfab845c9d2a2c72be2ed87895e).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29296377
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -208,3 +225,224 @@ class EMLDAOptimizer extends LDAOptimizer{
         new DistributedLDAModel(this, iterationTimes)
       }
     }
    +
    +
    +/**
    + * :: Experimental ::
    + *
    + * An online optimizer for LDA. The Optimizer implements the Online LDA algorithm, which
    + * processes a subset of the corpus by each call to next, and update the term-topic
    + * distribution adaptively.
    + *
    + * References:
    + *   Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010.
    + */
    +@Experimental
    +class OnlineLDAOptimizer extends LDAOptimizer {
    +
    +  // LDA common parameters
    +  private var k: Int = 0
    +  private var D: Int = 0
    +  private var vocabSize: Int = 0
    +  private var alpha: Double = 0
    +  private var eta: Double = 0
    +  private var randomSeed: Long = 0
    +
    +  // Online LDA specific parameters
    +  private var tau_0: Double = -1
    +  private var kappa: Double = -1
    +  private var batchSize: Int = -1
    +
    +  // internal data structure
    +  private var docs: RDD[(Long, Vector)] = null
    +  private var lambda: BDM[Double] = null
    +  private var Elogbeta: BDM[Double]= null
    +  private var expElogbeta: BDM[Double] = null
    +
    +  // count of invocation to next, used to help deciding the weight for each iteration
    +  private var iteration = 0
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   */
    +  def getTau_0: Double = {
    +    if (this.tau_0 == -1) {
    --- End diff --
    
    This default is not problem-dependent, so I'd recommend initializing the value to 1024 and not having a special value -1.  Can you please also make this fix for other parameters with defaults?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97294464
  
      [Test build #31206 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31206/consoleFull) for   PR 4419 at commit [`a996a82`](https://github.com/apache/spark/commit/a996a82e530a9e3d1465bb8872feb0ab70e579a3).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `trait LDAOptimizer `
      * `class EMLDAOptimizer extends LDAOptimizer `
      * `class OnlineLDAOptimizer extends LDAOptimizer `
    
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-73185882
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26899/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-84990748
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28996/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29296382
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -208,3 +225,224 @@ class EMLDAOptimizer extends LDAOptimizer{
         new DistributedLDAModel(this, iterationTimes)
       }
     }
    +
    +
    +/**
    + * :: Experimental ::
    + *
    + * An online optimizer for LDA. The Optimizer implements the Online LDA algorithm, which
    + * processes a subset of the corpus by each call to next, and update the term-topic
    + * distribution adaptively.
    + *
    + * References:
    + *   Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010.
    + */
    +@Experimental
    +class OnlineLDAOptimizer extends LDAOptimizer {
    +
    +  // LDA common parameters
    +  private var k: Int = 0
    +  private var D: Int = 0
    +  private var vocabSize: Int = 0
    +  private var alpha: Double = 0
    +  private var eta: Double = 0
    +  private var randomSeed: Long = 0
    +
    +  // Online LDA specific parameters
    +  private var tau_0: Double = -1
    +  private var kappa: Double = -1
    +  private var batchSize: Int = -1
    +
    +  // internal data structure
    +  private var docs: RDD[(Long, Vector)] = null
    +  private var lambda: BDM[Double] = null
    +  private var Elogbeta: BDM[Double]= null
    +  private var expElogbeta: BDM[Double] = null
    +
    +  // count of invocation to next, used to help deciding the weight for each iteration
    +  private var iteration = 0
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   */
    +  def getTau_0: Double = {
    +    if (this.tau_0 == -1) {
    +      1024
    +    } else {
    +      this.tau_0
    +    }
    +  }
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   * Automatic setting of parameter:
    +   *  - default = 1024, which follows the recommendation from OnlineLDA paper.
    +   */
    +  def setTau_0(tau_0: Double): this.type = {
    +    require(tau_0 > 0 || tau_0 == -1.0,  s"LDA tau_0 must be positive, but was set to $tau_0")
    +    this.tau_0 = tau_0
    +    this
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate
    +   */
    +  def getKappa: Double = {
    +    if (this.kappa == -1) {
    +      0.5
    +    } else {
    +      this.kappa
    +    }
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate---should be between
    +   * (0.5, 1.0] to guarantee asymptotic convergence.
    +   *  - default = 0.5, which follows the recommendation from OnlineLDA paper.
    --- End diff --
    
    This documentation is correct, but it may confuse users who see that 0.5 is not in the range stated.  How about stating the valid range, but recommending the range (0.5, 1.0]?  Also, let's not allow kappa = 0.
    We could also make the default be 0.51 so users don't ask questions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-97268385
  
    @jkbradley Thanks. I'll start to work based on current comments.
    Meanwhile, it's most welcome if you can send the PR. (I'm not sure how should I operate, can I just give you the write access to my branch?)
    I plan to finish the UT today, current items in my mind is to test as many methods as possible with hand-computed result and against Blei's implementation. And I would run it on a small corpus and verify the result.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/4419#issuecomment-76820786
  
    Thanks for the updates!  Responding:
    
    > About batch split. I used docId % batchNumber to split documents into batchNumber batches in the new commit. Will that work? I'm not sure I understand how stochastic gradient descent help in this case.
    
    That should help distribute the work; it will be good to see numbers about whether subsampling speeds things up enough.  (I mentioned SGD because you could take a random sample on each iteration, rather than a deterministic sample.  You wouldn't be able to use the other SGD code in MLlib, but a random sample would effectively be doing mini-batch SGD.  That might be a bit better since stochasticity is usually helpful in these non-convex problems.)
    
    > My initial idea is to support local matrix for now and add support for distributed matrix in the future.
    
    That sounds good.  I don't think you need to implement a distributed version in this PR, but it will be good to think about to make sure we can later generalize to a distributed version without breaking APIs.
    
    > Not sure about how to fit current version to the optimization steps. I thought the code is only for LDA and hard to be used in other context. Is there any example I can refer to?
    
    There's a nice explanation in Section 2.3 of the original paper: [Online Learning for Latent Dirichlet Allocation](https://www.cs.princeton.edu/~blei/papers/HoffmanBleiBach2010b.pdf).  I haven't thought carefully about whether this affects computation, but I think it'd be doable.  Don't bother, though, if it makes the code harder to understand; I mainly hope it will make the code easier to understand.
    
    I'll try to make another close pass soon!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29296358
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -98,13 +94,38 @@ class EMLDAOptimizer extends LDAOptimizer{
       /**
        * Compute bipartite term/doc graph.
        */
    -  private[clustering] override def initialState(
    -      docs: RDD[(Long, Vector)],
    -      k: Int,
    -      docConcentration: Double,
    -      topicConcentration: Double,
    -      randomSeed: Long,
    -      checkpointInterval: Int): LDAOptimizer = {
    +  private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA):
    --- End diff --
    
    style: header can fit on 1 line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org