You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by akopich <gi...@git.apache.org> on 2017/08/11 16:35:48 UTC

[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

GitHub user akopich opened a pull request:

    https://github.com/apache/spark/pull/18924

    [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not collect stats for each doc in mini-batch to driver

    Hi, 
    
    as it was proposed by Joseph K. Bradley, gammat are not collected to the driver anymore. 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/akopich/spark master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/18924.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #18924
    
----
commit f81f1cdcf6de1dafdc79c1801cc2e2f1f803f4cc
Author: Valeriy Avanesov <ac...@gmail.com>
Date:   2017-08-11T16:28:38Z

    [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not collect stats for each doc in mini-batch to driver
    
    gammat are not collected to a local matrix but rather represented as RDD[BDV[Double]] and are aggregated in a distributed manner

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r140199136
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
       }
     
       /**
    -   * Update alpha based on `gammat`, the inferred topic distributions for documents in the
    -   * current mini-batch. Uses Newton-Rhapson method.
    +   * Update alpha based on `logphat`.
    --- End diff --
    
    Please, check out the updated PR. 
    
    I have added `val stats: RDD[(BDM[Double], Option[BDV[Double]], Int)] = batch.mapPartitions {...}`.
    Unfortunately, we cannot have the aggregation operation in a purely in-place manner now since `Int` is immutable. Shouldn't be a big deal since matrices and vectors are still updated in place. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r140183412
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
       }
     
       /**
    -   * Update alpha based on `gammat`, the inferred topic distributions for documents in the
    -   * current mini-batch. Uses Newton-Rhapson method.
    +   * Update alpha based on `logphat`.
    --- End diff --
    
    Or another suggestion. Lets, have smth like 
    `val stats: RDD[(BDM[Double], Option[BDV[Double]], Int)] = batch.mapPartitions {...}`  where the `Int` stands for the number of non-empty elements in a partition.  
    
    What do you think?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    @WeichenXu123, @jkbradley, talking of merging. Is there anything else I should improve in this PR in order for it to be mergeable?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r139832997
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
       }
     
       /**
    -   * Update alpha based on `gammat`, the inferred topic distributions for documents in the
    -   * current mini-batch. Uses Newton-Rhapson method.
    +   * Update alpha based on `logphat`.
    --- End diff --
    
    Same with N


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r140193380
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
       }
     
       /**
    -   * Update alpha based on `gammat`, the inferred topic distributions for documents in the
    -   * current mini-batch. Uses Newton-Rhapson method.
    +   * Update alpha based on `logphat`.
    --- End diff --
    
    But should we have `val stats: RDD[(BDM[Double], Option[BDV[Double]], Int)] = batch.mapPartitions {...}` stuff? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by WeichenXu123 <gi...@git.apache.org>.
Github user WeichenXu123 commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Oh, sorry for that, it should waiting @jkbradley to merge it. Don't worry, I will contact him!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r142571685
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // We calculate logphat in the same pass as other statistics, but we only need
    +    // it if we are optimizing docConcentration
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k))
    +                                      else None
     
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase()
    +      var nonEmptyDocCount : Long = 0L
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
    +        nonEmptyDocCount += 1
             val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    -    expElogbetaBc.destroy(false)
    -    val batchResult = statsSum *:* expElogbeta.t
    +      Iterator((stat, logphatPartOption, nonEmptyDocCount))
    +    }
    +
    +    val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
    +                                 v : (BDM[Double], Option[BDV[Double]], Long)) => {
    --- End diff --
    
    indent


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r142826379
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // We calculate logphat in the same pass as other statistics, but we only need
    +    // it if we are optimizing docConcentration
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k))
    +                                      else None
     
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase()
    +      var nonEmptyDocCount : Long = 0L
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
    +        nonEmptyDocCount += 1
             val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    -    expElogbetaBc.destroy(false)
    -    val batchResult = statsSum *:* expElogbeta.t
    +      Iterator((stat, logphatPartOption, nonEmptyDocCount))
    +    }
    +
    +    val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
    +                                 v : (BDM[Double], Option[BDV[Double]], Long)) => {
    +      u._1 += v._1
    +      u._2.foreach(_ += v._2.get)
    +      (u._1, u._2, u._3 + v._3)
    +    }
    +
    +    val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats
    +      .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))(
    +        elementWiseSum, elementWiseSum
    +      )
     
    +    val batchResult = statsSum *:* expElogbeta.t
         // Note that this is an optimization to avoid batch.count
    -    updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
    -    if (optimizeDocConcentration) updateAlpha(gammat)
    +    val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
    +    updateLambda(batchResult, batchSize)
    +
    +    logphatOption.foreach(_ /= batchSize.toDouble)
    --- End diff --
    
    That sounds right to me.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by WeichenXu123 <gi...@git.apache.org>.
Github user WeichenXu123 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r140249325
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
       }
     
       /**
    -   * Update alpha based on `gammat`, the inferred topic distributions for documents in the
    -   * current mini-batch. Uses Newton-Rhapson method.
    +   * Update alpha based on `logphat`.
    --- End diff --
    
    I think it doesn't matter if `seq` do not return left param reference but return a new object.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    **[Test build #81885 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81885/testReport)** for PR 18924 at commit [`9ce9655`](https://github.com/apache/spark/commit/9ce9655d2275454e016dd3f3b640e578f4b6e0e4).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/18924


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r142571728
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // We calculate logphat in the same pass as other statistics, but we only need
    +    // it if we are optimizing docConcentration
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k))
    +                                      else None
     
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase()
    +      var nonEmptyDocCount : Long = 0L
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
    +        nonEmptyDocCount += 1
             val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    -    expElogbetaBc.destroy(false)
    -    val batchResult = statsSum *:* expElogbeta.t
    +      Iterator((stat, logphatPartOption, nonEmptyDocCount))
    +    }
    +
    +    val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
    +                                 v : (BDM[Double], Option[BDV[Double]], Long)) => {
    +      u._1 += v._1
    +      u._2.foreach(_ += v._2.get)
    +      (u._1, u._2, u._3 + v._3)
    +    }
    +
    +    val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats
    --- End diff --
    
    extra space after nonEmptyDocsN 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r143064794
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    -
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // If and only if optimizeDocConcentration is set true,
    +    // we calculate logphat in the same pass as other statistics.
    +    // No calculation of loghat happens otherwise.
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) {
    +                                          Some(BDV.zeros[Double](k))
    +                                        } else {
    +                                          None
    +                                        }
    +
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase()
    +      var nonEmptyDocCount : Long = 0L
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
    +        nonEmptyDocCount += 1
             val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    -    expElogbetaBc.destroy(false)
    -    val batchResult = statsSum *:* expElogbeta.t
    +      Iterator((stat, logphatPartOption, nonEmptyDocCount))
    +    }
     
    +    val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
    +                          v : (BDM[Double], Option[BDV[Double]], Long)) => {
    +      u._1 += v._1
    +      u._2.foreach(_ += v._2.get)
    +      (u._1, u._2, u._3 + v._3)
    +    }
    +
    +    val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN: Long) = stats
    +      .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))(
    +        elementWiseSum, elementWiseSum
    +      )
    +
    +    if (nonEmptyDocsN == 0) {
    +      logWarning("No non-empty documents were submitted in the batch.")
    +      // Therefore, there is no need to update any of the model parameters
    +      return this
    +    }
    +
    +    val batchResult = statsSum *:* expElogbeta.t
         // Note that this is an optimization to avoid batch.count
    -    updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
    -    if (optimizeDocConcentration) updateAlpha(gammat)
    +    val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
    +    updateLambda(batchResult, batchSize)
    +
    +    logphatOption.foreach(_ /= nonEmptyDocsN.toDouble)
    +    logphatOption.foreach(updateAlpha(_, nonEmptyDocsN))
    +
    +    expElogbetaBc.destroy(false)
    --- End diff --
    
    Great point. Thank you. Moreover, it should be destroyed as soon as there is no need in it anymore. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Merging with master


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r143077626
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    -
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // If and only if optimizeDocConcentration is set true,
    +    // we calculate logphat in the same pass as other statistics.
    +    // No calculation of loghat happens otherwise.
    --- End diff --
    
    I see. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    @jkbradley, no problem. The test build seems to be aborted. What's wrong?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by WeichenXu123 <gi...@git.apache.org>.
Github user WeichenXu123 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r139467949
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +462,44 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val logphatPartOptionBase = if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) else None
     
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]])] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
             val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    -    expElogbetaBc.destroy(false)
    -    val batchResult = statsSum *:* expElogbeta.t
    +      Iterator((stat, logphatPartOption))
    +    }
    +
    +    val elementWiseSumInPlace = (u : (BDM[Double], Option[BDV[Double]]),
    +                                 v : (BDM[Double], Option[BDV[Double]])) => {
    +      u._1 += v._1
    +      u._2.foreach(_ += v._2.get)
    +      u
    +    }
    +
    +    val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]]) = stats
    +      .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase))(
    +        elementWiseSumInPlace, elementWiseSumInPlace
    +      )
     
    +    val batchResult = statsSum *:* expElogbeta.t
         // Note that this is an optimization to avoid batch.count
    -    updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
    -    if (optimizeDocConcentration) updateAlpha(gammat)
    +    val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
    +    updateLambda(batchResult, batchSize)
    +
    +    logphatOption.foreach(_ /= batchSize.toDouble)
    +    logphatOption.foreach(updateAlpha(_, batchSize))
    +
    +    expElogbetaBc.destroy(false)
    +    stats.unpersist()
    --- End diff --
    
    This line is useless.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    **[Test build #82111 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82111/testReport)** for PR 18924 at commit [`1f9e650`](https://github.com/apache/spark/commit/1f9e650baab01c3b248d5b0225dbec1cde0c737d).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    **[Test build #82505 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82505/testReport)** for PR 18924 at commit [`f181496`](https://github.com/apache/spark/commit/f1814965885e0c82a71287f5e5912e11b126b8a4).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81893/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    **[Test build #82487 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82487/testReport)** for PR 18924 at commit [`2942082`](https://github.com/apache/spark/commit/294208217b0cbf0fe745b7a8c603ec2f5675f5dc).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82111/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r143080675
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -503,21 +533,22 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
       }
     
       /**
    -   * Update alpha based on `gammat`, the inferred topic distributions for documents in the
    -   * current mini-batch. Uses Newton-Rhapson method.
    +   * Update alpha based on `logphat`.
    +   * Uses Newton-Rhapson method.
        * @see Section 3.3, Huang: Maximum Likelihood Estimation of Dirichlet Distribution Parameters
        *      (http://jonathan-huang.org/research/dirichlet/dirichlet.pdf)
    +   * @param logphat Expectation of estimated log-posterior distribution of
    +   *                topics in a document averaged over the batch.
    +   * @param nonEmptyDocsN number of non-empty documents
        */
    -  private def updateAlpha(gammat: BDM[Double]): Unit = {
    +  private def updateAlpha(logphat: BDV[Double], nonEmptyDocsN : Double): Unit = {
    --- End diff --
    
    nonEmptyDocsN: Int ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    **[Test build #81893 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81893/testReport)** for PR 18924 at commit [`8e9bc73`](https://github.com/apache/spark/commit/8e9bc7308c0101131b6b1c6f7968c48514f18cce).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    @hhbyyh, this change does not target performance but scalability, and I am afraid, the change is beneficial only for huge datasets and the tests would require massive computational resources. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    @jkbradley, thanks for the comments. Who is supposed to create the followup jira? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82505/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    **[Test build #3951 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3951/testReport)** for PR 18924 at commit [`a81dae5`](https://github.com/apache/spark/commit/a81dae574f2085ec390effd1b9b1962970f00239).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r143068229
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    -
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // If and only if optimizeDocConcentration is set true,
    +    // we calculate logphat in the same pass as other statistics.
    +    // No calculation of loghat happens otherwise.
    --- End diff --
    
    About the comments. Keep it as you wish.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r142571603
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // We calculate logphat in the same pass as other statistics, but we only need
    +    // it if we are optimizing docConcentration
    --- End diff --
    
    The comment is not that accurate. If `optimizeDocConcentration==false`, logphat will not be calculated.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    @WeichenXu123, could you please notify @jkbradley once again?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r140180799
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
       }
     
       /**
    -   * Update alpha based on `gammat`, the inferred topic distributions for documents in the
    -   * current mini-batch. Uses Newton-Rhapson method.
    +   * Update alpha based on `logphat`.
    --- End diff --
    
    @WeichenXu123, you are right. So should we add `stats.count()` or should we rather embed the counting in the aggregation phase so that we avoid the second pass?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    **[Test build #82482 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82482/testReport)** for PR 18924 at commit [`68ca837`](https://github.com/apache/spark/commit/68ca837569f9a0ae5cac8ced7f35b13e2f941522).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r142624246
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // We calculate logphat in the same pass as other statistics, but we only need
    +    // it if we are optimizing docConcentration
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k))
    +                                      else None
     
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase()
    +      var nonEmptyDocCount : Long = 0L
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
    +        nonEmptyDocCount += 1
             val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    -    expElogbetaBc.destroy(false)
    -    val batchResult = statsSum *:* expElogbeta.t
    +      Iterator((stat, logphatPartOption, nonEmptyDocCount))
    +    }
    +
    +    val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
    +                                 v : (BDM[Double], Option[BDV[Double]], Long)) => {
    --- End diff --
    
    Thanks. Fixed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    ping @jkbradley. Anyway, tests are passed now. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    @jkbradley, thank you!
    
    - Correctness: in order to test the equivalence of two versions of `submitMiniBatch` I have to bring both of them into the scope... One solution would be to derive a class `OldOnlineLDAOptimizer` from `OnlineLDAOptimizer` and override `submitMiniBatch` but the class is final. What's the preferred approach?
    - Sure. Sounds good. Should I add test-case reporting the CPU time or should I rather define an `App`? Should I add the code to the PR or just report the results here? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    @akopich I'm afraid pings on Git don't work for me; I just have too many to keep up with.  Again, sorry for the delays; I have very limited bandwidth nowadays.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    **[Test build #82506 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82506/testReport)** for PR 18924 at commit [`a81dae5`](https://github.com/apache/spark/commit/a81dae574f2085ec390effd1b9b1962970f00239).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Yes, I think local test is enough for both correctness and performance. 
    
    For consistency with old LDA, just some manual local test would be sufficient. You may well just use the LDA example and switch the Spark jar files between Spark 2.2 and your branch. And I think the case with empty document worth special attention.
    
    The same for the performance test. You may just post the result after your local test. I'm OK as long as there's no noticeable regression.
    
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r142624340
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // We calculate logphat in the same pass as other statistics, but we only need
    +    // it if we are optimizing docConcentration
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k))
    +                                      else None
     
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase()
    +      var nonEmptyDocCount : Long = 0L
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
    +        nonEmptyDocCount += 1
             val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    -    expElogbetaBc.destroy(false)
    -    val batchResult = statsSum *:* expElogbeta.t
    +      Iterator((stat, logphatPartOption, nonEmptyDocCount))
    +    }
    +
    +    val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
    +                                 v : (BDM[Double], Option[BDV[Double]], Long)) => {
    +      u._1 += v._1
    +      u._2.foreach(_ += v._2.get)
    +      (u._1, u._2, u._3 + v._3)
    +    }
    +
    +    val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats
    --- End diff --
    
    Thanks. Fixed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    @WeichenXu123, no problem! Thank you.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r143066229
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    -
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // If and only if optimizeDocConcentration is set true,
    +    // we calculate logphat in the same pass as other statistics.
    +    // No calculation of loghat happens otherwise.
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) {
    +                                          Some(BDV.zeros[Double](k))
    +                                        } else {
    +                                          None
    +                                        }
    +
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase()
    +      var nonEmptyDocCount : Long = 0L
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
    +        nonEmptyDocCount += 1
             val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    -    expElogbetaBc.destroy(false)
    -    val batchResult = statsSum *:* expElogbeta.t
    +      Iterator((stat, logphatPartOption, nonEmptyDocCount))
    +    }
     
    +    val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
    +                          v : (BDM[Double], Option[BDV[Double]], Long)) => {
    +      u._1 += v._1
    +      u._2.foreach(_ += v._2.get)
    +      (u._1, u._2, u._3 + v._3)
    +    }
    +
    +    val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN: Long) = stats
    +      .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))(
    +        elementWiseSum, elementWiseSum
    +      )
    +
    +    if (nonEmptyDocsN == 0) {
    --- End diff --
    
    But spark scala style guide says :
    
    "... \<when\> return is preferred: Use `return` as a guard to simplify control flow without adding a level of indentation".


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    **[Test build #82027 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82027/testReport)** for PR 18924 at commit [`8227b1a`](https://github.com/apache/spark/commit/8227b1af574e70aa3cc42f9b5146d5757e4cfb37).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by WeichenXu123 <gi...@git.apache.org>.
Github user WeichenXu123 commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Thanks! I will take a look later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org

[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/80524/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    @WeichenXu123, the PR seems to receive no attention for 10 days now... What should I do? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82448/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    **[Test build #82007 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82007/testReport)** for PR 18924 at commit [`b3875e8`](https://github.com/apache/spark/commit/b3875e849a0659e9c2e5db16b97f909a0451ecd6).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r143060674
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    -
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // If and only if optimizeDocConcentration is set true,
    +    // we calculate logphat in the same pass as other statistics.
    +    // No calculation of loghat happens otherwise.
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) {
    +                                          Some(BDV.zeros[Double](k))
    --- End diff --
    
    Thx. Fxd.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r142625490
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // We calculate logphat in the same pass as other statistics, but we only need
    +    // it if we are optimizing docConcentration
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k))
    +                                      else None
     
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase()
    +      var nonEmptyDocCount : Long = 0L
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
    +        nonEmptyDocCount += 1
             val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    -    expElogbetaBc.destroy(false)
    -    val batchResult = statsSum *:* expElogbeta.t
    +      Iterator((stat, logphatPartOption, nonEmptyDocCount))
    +    }
    +
    +    val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
    +                                 v : (BDM[Double], Option[BDV[Double]], Long)) => {
    +      u._1 += v._1
    +      u._2.foreach(_ += v._2.get)
    +      (u._1, u._2, u._3 + v._3)
    +    }
    +
    +    val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats
    +      .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))(
    +        elementWiseSum, elementWiseSum
    +      )
     
    +    val batchResult = statsSum *:* expElogbeta.t
         // Note that this is an optimization to avoid batch.count
    -    updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
    -    if (optimizeDocConcentration) updateAlpha(gammat)
    +    val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
    +    updateLambda(batchResult, batchSize)
    +
    +    logphatOption.foreach(_ /= batchSize.toDouble)
    +    logphatOption.foreach(updateAlpha(_, nonEmptyDocsN))
    +
    +    expElogbetaBc.destroy(false)
    +
         this
       }
     
       /**
    -   * Update lambda based on the batch submitted. batchSize can be different for each iteration.
    +   * Update lambda based on the batch submitted. nonEmptyDocsN can be different for each iteration.
    --- End diff --
    
    Thanks. Comment reverted. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r143055573
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    -
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // If and only if optimizeDocConcentration is set true,
    +    // we calculate logphat in the same pass as other statistics.
    +    // No calculation of loghat happens otherwise.
    --- End diff --
    
    Thinking again, logphatPartOptionBase should have been evaluated on the driver side. I'm not sure assigning optimizeDocConcentration to local variable is necessary here.
    
    comments can be simpler: calculate logphat only if optimizeDocConcentration is true, to update alpha.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r143080051
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    -
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // If and only if optimizeDocConcentration is set true,
    +    // we calculate logphat in the same pass as other statistics.
    +    // No calculation of loghat happens otherwise.
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) {
    --- End diff --
    
    Not an style expert myself. Just what I would use:
    ```
        val logphatPartOptionBase = () => {
          if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) else None
        }
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    @feynmanliang , @hhbyyh, @WeichenXu123, could you please review the PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org

[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r142632240
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // We calculate logphat in the same pass as other statistics, but we only need
    +    // it if we are optimizing docConcentration
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k))
    +                                      else None
     
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase()
    +      var nonEmptyDocCount : Long = 0L
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
    +        nonEmptyDocCount += 1
             val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    -    expElogbetaBc.destroy(false)
    -    val batchResult = statsSum *:* expElogbeta.t
    +      Iterator((stat, logphatPartOption, nonEmptyDocCount))
    +    }
    +
    +    val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
    +                                 v : (BDM[Double], Option[BDV[Double]], Long)) => {
    +      u._1 += v._1
    +      u._2.foreach(_ += v._2.get)
    +      (u._1, u._2, u._3 + v._3)
    +    }
    +
    +    val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats
    +      .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))(
    +        elementWiseSum, elementWiseSum
    +      )
     
    +    val batchResult = statsSum *:* expElogbeta.t
         // Note that this is an optimization to avoid batch.count
    -    updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
    -    if (optimizeDocConcentration) updateAlpha(gammat)
    +    val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
    +    updateLambda(batchResult, batchSize)
    +
    +    logphatOption.foreach(_ /= batchSize.toDouble)
    --- End diff --
    
    Thanks for the good point. Do I understand correctly that if a batch without any non-empty docs is submitted, the `submitMiniBatch` method shouldn't change the state of `LDAOptimizer`? 
    cc @WeichenXu123 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r143080481
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    -
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // If and only if optimizeDocConcentration is set true,
    +    // we calculate logphat in the same pass as other statistics.
    +    // No calculation of loghat happens otherwise.
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) {
    +                                        Some(BDV.zeros[Double](k))
    +                                      } else {
    +                                        None
    +                                      }
    +
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase()
    +      var nonEmptyDocCount : Long = 0L
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
    +        nonEmptyDocCount += 1
             val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    +      Iterator((stat, logphatPartOption, nonEmptyDocCount))
    +    }
    +
    +    val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
    +                          v : (BDM[Double], Option[BDV[Double]], Long)) => {
    --- End diff --
    
    Minor: indent.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r143058244
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    -
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // If and only if optimizeDocConcentration is set true,
    +    // we calculate logphat in the same pass as other statistics.
    +    // No calculation of loghat happens otherwise.
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) {
    +                                          Some(BDV.zeros[Double](k))
    +                                        } else {
    +                                          None
    +                                        }
    +
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase()
    +      var nonEmptyDocCount : Long = 0L
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
    +        nonEmptyDocCount += 1
             val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    -    expElogbetaBc.destroy(false)
    -    val batchResult = statsSum *:* expElogbeta.t
    +      Iterator((stat, logphatPartOption, nonEmptyDocCount))
    +    }
     
    +    val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
    +                          v : (BDM[Double], Option[BDV[Double]], Long)) => {
    +      u._1 += v._1
    +      u._2.foreach(_ += v._2.get)
    +      (u._1, u._2, u._3 + v._3)
    +    }
    +
    +    val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN: Long) = stats
    +      .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))(
    +        elementWiseSum, elementWiseSum
    +      )
    +
    +    if (nonEmptyDocsN == 0) {
    +      logWarning("No non-empty documents were submitted in the batch.")
    +      // Therefore, there is no need to update any of the model parameters
    +      return this
    +    }
    +
    +    val batchResult = statsSum *:* expElogbeta.t
         // Note that this is an optimization to avoid batch.count
    -    updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
    -    if (optimizeDocConcentration) updateAlpha(gammat)
    +    val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
    +    updateLambda(batchResult, batchSize)
    +
    +    logphatOption.foreach(_ /= nonEmptyDocsN.toDouble)
    +    logphatOption.foreach(updateAlpha(_, nonEmptyDocsN))
    +
    +    expElogbetaBc.destroy(false)
    --- End diff --
    
    expElogbetaBc should always be destroyed. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    **[Test build #82880 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82880/testReport)** for PR 18924 at commit [`a81dae5`](https://github.com/apache/spark/commit/a81dae574f2085ec390effd1b9b1962970f00239).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r139514402
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +462,44 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val logphatPartOptionBase = if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) else None
     
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]])] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
             val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    -    expElogbetaBc.destroy(false)
    -    val batchResult = statsSum *:* expElogbeta.t
    +      Iterator((stat, logphatPartOption))
    +    }
    +
    +    val elementWiseSumInPlace = (u : (BDM[Double], Option[BDV[Double]]),
    +                                 v : (BDM[Double], Option[BDV[Double]])) => {
    +      u._1 += v._1
    +      u._2.foreach(_ += v._2.get)
    +      u
    +    }
    +
    +    val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]]) = stats
    +      .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase))(
    +        elementWiseSumInPlace, elementWiseSumInPlace
    +      )
     
    +    val batchResult = statsSum *:* expElogbeta.t
         // Note that this is an optimization to avoid batch.count
    -    updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
    -    if (optimizeDocConcentration) updateAlpha(gammat)
    +    val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
    +    updateLambda(batchResult, batchSize)
    +
    +    logphatOption.foreach(_ /= batchSize.toDouble)
    +    logphatOption.foreach(updateAlpha(_, batchSize))
    +
    +    expElogbetaBc.destroy(false)
    +    stats.unpersist()
    --- End diff --
    
    Do you mean `stats.unpersist()`? Sure, I got rid of it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    I'll update JIRA later; it seems like Apache JIRA is having problems right now.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    @WeichenXu123. thank you


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by WeichenXu123 <gi...@git.apache.org>.
Github user WeichenXu123 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r140111453
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
       }
     
       /**
    -   * Update alpha based on `gammat`, the inferred topic distributions for documents in the
    -   * current mini-batch. Uses Newton-Rhapson method.
    +   * Update alpha based on `logphat`.
    --- End diff --
    
    There're a small difference between old `N` and `batchSize`. `N` in old version code do not count non-empty docs.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r142571342
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    --- End diff --
    
    If it's only used once, reassign to local variable is not necessary.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    @WeichenXu123, thank you for your prompt reply!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82007/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r143112965
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    -
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // If and only if optimizeDocConcentration is set true,
    +    // we calculate logphat in the same pass as other statistics.
    +    // No calculation of loghat happens otherwise.
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) {
    +                                        Some(BDV.zeros[Double](k))
    +                                      } else {
    +                                        None
    +                                      }
    +
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase()
    +      var nonEmptyDocCount : Long = 0L
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
    +        nonEmptyDocCount += 1
             val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    +      Iterator((stat, logphatPartOption, nonEmptyDocCount))
    +    }
    +
    +    val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
    +                          v : (BDM[Double], Option[BDV[Double]], Long)) => {
    --- End diff --
    
    ```
    val elementWiseSum = (u: (BDM[Double], Option[BDV[Double]], Long),
        v: (BDM[Double], Option[BDV[Double]], Long)) => {
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by WeichenXu123 <gi...@git.apache.org>.
Github user WeichenXu123 commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    @akopich Yes you can wait this to be merged first. I think @jkbradley will have time to check this next week. Don't worry!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r143084875
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -503,21 +533,22 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
       }
     
       /**
    -   * Update alpha based on `gammat`, the inferred topic distributions for documents in the
    -   * current mini-batch. Uses Newton-Rhapson method.
    +   * Update alpha based on `logphat`.
    +   * Uses Newton-Rhapson method.
        * @see Section 3.3, Huang: Maximum Likelihood Estimation of Dirichlet Distribution Parameters
        *      (http://jonathan-huang.org/research/dirichlet/dirichlet.pdf)
    +   * @param logphat Expectation of estimated log-posterior distribution of
    +   *                topics in a document averaged over the batch.
    +   * @param nonEmptyDocsN number of non-empty documents
        */
    -  private def updateAlpha(gammat: BDM[Double]): Unit = {
    +  private def updateAlpha(logphat: BDV[Double], nonEmptyDocsN : Double): Unit = {
    --- End diff --
    
    The methods will have to cast `nonEmptyDocsN: Int` to `Double`. This way we have the conversion implicitly, but the method is private so I don't think it's going to hurt.  


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by WeichenXu123 <gi...@git.apache.org>.
Github user WeichenXu123 commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    @akopich  LGTM. and do you have time to create a PR to resolve random seed not working issue mentioned by @hhbyyh ?  Thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    **[Test build #82448 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82448/testReport)** for PR 18924 at commit [`5c9547f`](https://github.com/apache/spark/commit/5c9547f6bc85b68d50b3c58a4a0d17eb6c75dcaf).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    **[Test build #82030 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82030/testReport)** for PR 18924 at commit [`d7004a1`](https://github.com/apache/spark/commit/d7004a126a363e5bbfce5b335098f938f593517d).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r142624984
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // We calculate logphat in the same pass as other statistics, but we only need
    +    // it if we are optimizing docConcentration
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k))
    +                                      else None
     
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase()
    +      var nonEmptyDocCount : Long = 0L
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
    +        nonEmptyDocCount += 1
             val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    -    expElogbetaBc.destroy(false)
    -    val batchResult = statsSum *:* expElogbeta.t
    +      Iterator((stat, logphatPartOption, nonEmptyDocCount))
    +    }
    +
    +    val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
    +                                 v : (BDM[Double], Option[BDV[Double]], Long)) => {
    +      u._1 += v._1
    +      u._2.foreach(_ += v._2.get)
    +      (u._1, u._2, u._3 + v._3)
    +    }
    +
    +    val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats
    +      .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))(
    +        elementWiseSum, elementWiseSum
    +      )
     
    +    val batchResult = statsSum *:* expElogbeta.t
         // Note that this is an optimization to avoid batch.count
    -    updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
    -    if (optimizeDocConcentration) updateAlpha(gammat)
    +    val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
    --- End diff --
    
    I believe, this will be settled down SPARK-22111.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r142620788
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    --- End diff --
    
    This line is necessary in order to avoid serialization of `LDASuite` which is not serializable. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    **[Test build #82448 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82448/testReport)** for PR 18924 at commit [`5c9547f`](https://github.com/apache/spark/commit/5c9547f6bc85b68d50b3c58a4a0d17eb6c75dcaf).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    @jkbradley, no problem.
    @jkbradley, @WeichenXu123, @hhbyyh, thank you all guys!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Yes, sure. Thank you for the valuable comment. Hopefully, I'll update the code this week.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r143003890
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +462,54 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    -
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // If and only if optimizeDocConcentration is set true,
    +    // we calculate logphat in the same pass as other statistics.
    +    // No calculation of loghat happens otherwise.
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) {
    +                                          Some(BDV.zeros[Double](k))
    +                                        } else {
    +                                          None
    +                                        }
    +
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase()
    +      var nonEmptyDocCount : Long = 0L
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
    +        nonEmptyDocCount += 1
             val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    -    expElogbetaBc.destroy(false)
    -    val batchResult = statsSum *:* expElogbeta.t
    +      Iterator((stat, logphatPartOption, nonEmptyDocCount))
    +    }
    +
    +    val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
    +                          v : (BDM[Double], Option[BDV[Double]], Long)) => {
    +      u._1 += v._1
    +      u._2.foreach(_ += v._2.get)
    +      (u._1, u._2, u._3 + v._3)
    +    }
    +
    +    val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN: Long) = stats
    +      .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))(
    +        elementWiseSum, elementWiseSum
    +      )
     
    +    val batchResult = statsSum *:* expElogbeta.t
         // Note that this is an optimization to avoid batch.count
    -    updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
    -    if (optimizeDocConcentration) updateAlpha(gammat)
    +    val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
    +    updateLambda(batchResult, batchSize)
    +
    +    logphatOption.foreach(_ /= nonEmptyDocsN.toDouble)
    --- End diff --
    
    Thanks for the comments, @jkbradley  and @hhbyyh. The check is added. I have also added a generation of warning message in case of an "empty" batch. I believe, a user should know that a thing like that happened. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    @jkbradley, thank you for your comments! Please, check out the commit adding the necessary docs.
    
    Regarding tests: I believe, `OnlineLDAOptimizer alpha hyperparameter optimization` from `mllib/clustering/LDASuite.scala` covers the piece of code being rewritten. Or should there be tests of a different kind? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r143056727
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    -
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // If and only if optimizeDocConcentration is set true,
    +    // we calculate logphat in the same pass as other statistics.
    +    // No calculation of loghat happens otherwise.
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) {
    +                                          Some(BDV.zeros[Double](k))
    --- End diff --
    
    indent


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r142622117
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // We calculate logphat in the same pass as other statistics, but we only need
    +    // it if we are optimizing docConcentration
    --- End diff --
    
    Thanks. Fixed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82487/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r143060537
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    -
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // If and only if optimizeDocConcentration is set true,
    +    // we calculate logphat in the same pass as other statistics.
    +    // No calculation of loghat happens otherwise.
    --- End diff --
    
    If I don't assign logphatPartOptionBase to a local variable, NonSerializableException is generated.
    
    Regarding comments. Isn't it necessary to emphasise that the computation happens in the same pass?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r143084656
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    -
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // If and only if optimizeDocConcentration is set true,
    +    // we calculate logphat in the same pass as other statistics.
    +    // No calculation of loghat happens otherwise.
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) {
    +                                        Some(BDV.zeros[Double](k))
    +                                      } else {
    +                                        None
    +                                      }
    +
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase()
    +      var nonEmptyDocCount : Long = 0L
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
    +        nonEmptyDocCount += 1
             val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    +      Iterator((stat, logphatPartOption, nonEmptyDocCount))
    +    }
    +
    +    val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
    +                          v : (BDM[Double], Option[BDV[Double]], Long)) => {
    --- End diff --
    
    Do you mean the extra spaces after `u` and `v`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by WeichenXu123 <gi...@git.apache.org>.
Github user WeichenXu123 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r139470472
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +462,44 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val logphatPartOptionBase = if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) else None
     
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]])] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase
    --- End diff --
    
    We can not reference outer variable and modify it in `map` function, it will generate undefined result.
    You need create the `logphatPartOption` object in the `map` function, like:
    ```
    val localOptimizeDocConcentration = optimizeDocConcentration
    batch.mapPartitions { docs =>
      ...
      val logphatPartOption = if (localOptimizeDocConcentration) Some(BDV.zeros[Double](k)) else None
      ...
    ```
    And note that avoid directly use `optimizeDocConcentration` in `rdd.map` function because the var  is class member and will cause the whole class object to serialize.
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82880/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r142833374
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // We calculate logphat in the same pass as other statistics, but we only need
    +    // it if we are optimizing docConcentration
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k))
    +                                      else None
     
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase()
    +      var nonEmptyDocCount : Long = 0L
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
    +        nonEmptyDocCount += 1
             val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    -    expElogbetaBc.destroy(false)
    -    val batchResult = statsSum *:* expElogbeta.t
    +      Iterator((stat, logphatPartOption, nonEmptyDocCount))
    +    }
    +
    +    val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
    +                                 v : (BDM[Double], Option[BDV[Double]], Long)) => {
    +      u._1 += v._1
    +      u._2.foreach(_ += v._2.get)
    +      (u._1, u._2, u._3 + v._3)
    +    }
    +
    +    val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats
    +      .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))(
    +        elementWiseSum, elementWiseSum
    +      )
     
    +    val batchResult = statsSum *:* expElogbeta.t
         // Note that this is an optimization to avoid batch.count
    -    updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
    -    if (optimizeDocConcentration) updateAlpha(gammat)
    +    val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
    --- End diff --
    
    Sure, since we're talking about consistency with old LDA. It's fine to keep using batchSize here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r139514301
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +462,44 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val logphatPartOptionBase = if (optimizeDocConcentration) Some(BDV.zeros[Double](k)) else None
     
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]])] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase
    --- End diff --
    
    Great point. Thank you.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by WeichenXu123 <gi...@git.apache.org>.
Github user WeichenXu123 commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    @akopich follow-up JIRA created here
    https://issues.apache.org/jira/browse/SPARK-22111
    Can you create follow up PR after this PR being merged ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r139832967
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
       }
     
       /**
    -   * Update alpha based on `gammat`, the inferred topic distributions for documents in the
    -   * current mini-batch. Uses Newton-Rhapson method.
    +   * Update alpha based on `logphat`.
    --- End diff --
    
    Document what logphat is


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    **[Test build #82027 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82027/testReport)** for PR 18924 at commit [`8227b1a`](https://github.com/apache/spark/commit/8227b1af574e70aa3cc42f9b5146d5757e4cfb37).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    **[Test build #82030 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82030/testReport)** for PR 18924 at commit [`d7004a1`](https://github.com/apache/spark/commit/d7004a126a363e5bbfce5b335098f938f593517d).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    **[Test build #80524 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/80524/testReport)** for PR 18924 at commit [`f81f1cd`](https://github.com/apache/spark/commit/f81f1cdcf6de1dafdc79c1801cc2e2f1f803f4cc).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    BTW. Seems like `updateLambda` method relies (in older version as well) on `batchSize` only because this is `an optimization to avoid batch.count`. Shouldn't we rather use `nonEmptyDocsN` instead since we compute it efficietly now? But that is going to change logic...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r140630215
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // We calculate logphat in the same pass as other statistics, but we only need
    +    // it if we are optimizing docConcentration
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k))
    +                                      else None
     
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Int)] = batch.mapPartitions { docs =>
    --- End diff --
    
    Done.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by WeichenXu123 <gi...@git.apache.org>.
Github user WeichenXu123 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r140188363
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
       }
     
       /**
    -   * Update alpha based on `gammat`, the inferred topic distributions for documents in the
    -   * current mini-batch. Uses Newton-Rhapson method.
    +   * Update alpha based on `logphat`.
    --- End diff --
    
    I think it's OK. Let's keep the logic exactly the same with old version.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82030/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r142574453
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // We calculate logphat in the same pass as other statistics, but we only need
    +    // it if we are optimizing docConcentration
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k))
    +                                      else None
     
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase()
    +      var nonEmptyDocCount : Long = 0L
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
    +        nonEmptyDocCount += 1
             val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    -    expElogbetaBc.destroy(false)
    -    val batchResult = statsSum *:* expElogbeta.t
    +      Iterator((stat, logphatPartOption, nonEmptyDocCount))
    +    }
    +
    +    val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
    +                                 v : (BDM[Double], Option[BDV[Double]], Long)) => {
    +      u._1 += v._1
    +      u._2.foreach(_ += v._2.get)
    +      (u._1, u._2, u._3 + v._3)
    +    }
    +
    +    val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats
    +      .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))(
    +        elementWiseSum, elementWiseSum
    +      )
     
    +    val batchResult = statsSum *:* expElogbeta.t
         // Note that this is an optimization to avoid batch.count
    -    updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
    -    if (optimizeDocConcentration) updateAlpha(gammat)
    +    val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
    +    updateLambda(batchResult, batchSize)
    +
    +    logphatOption.foreach(_ /= batchSize.toDouble)
    --- End diff --
    
    Should use nonEmptyDocsN to be consistent with original implementation, also avoid divide by 0.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    LGTM
    Sorry for the delay!
    I'll merge it after re-running tests


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by WeichenXu123 <gi...@git.apache.org>.
Github user WeichenXu123 commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    LGTM. Thanks! ping @jkbradley 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r142572013
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // We calculate logphat in the same pass as other statistics, but we only need
    +    // it if we are optimizing docConcentration
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k))
    +                                      else None
     
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase()
    +      var nonEmptyDocCount : Long = 0L
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
    +        nonEmptyDocCount += 1
             val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    -    expElogbetaBc.destroy(false)
    -    val batchResult = statsSum *:* expElogbeta.t
    +      Iterator((stat, logphatPartOption, nonEmptyDocCount))
    +    }
    +
    +    val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
    +                                 v : (BDM[Double], Option[BDV[Double]], Long)) => {
    +      u._1 += v._1
    +      u._2.foreach(_ += v._2.get)
    +      (u._1, u._2, u._3 + v._3)
    +    }
    +
    +    val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats
    +      .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))(
    +        elementWiseSum, elementWiseSum
    +      )
     
    +    val batchResult = statsSum *:* expElogbeta.t
         // Note that this is an optimization to avoid batch.count
    -    updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
    -    if (optimizeDocConcentration) updateAlpha(gammat)
    +    val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
    --- End diff --
    
    Instead of batchSize, I think we may use nonEmptyDocsN directly. @jkbradley please double check since it will be a behavior change.
    Also please notice that nonEmptyDocsN can be zero, so be careful for the divide by 0. 



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r142571627
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // We calculate logphat in the same pass as other statistics, but we only need
    +    // it if we are optimizing docConcentration
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k))
    +                                      else None
    --- End diff --
    
    indent


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Taking a look


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r143081342
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    -
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // If and only if optimizeDocConcentration is set true,
    +    // we calculate logphat in the same pass as other statistics.
    +    // No calculation of loghat happens otherwise.
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) {
    +                                          Some(BDV.zeros[Double](k))
    +                                        } else {
    +                                          None
    +                                        }
    +
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase()
    +      var nonEmptyDocCount : Long = 0L
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
    +        nonEmptyDocCount += 1
             val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    -    expElogbetaBc.destroy(false)
    -    val batchResult = statsSum *:* expElogbeta.t
    +      Iterator((stat, logphatPartOption, nonEmptyDocCount))
    +    }
     
    +    val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
    +                          v : (BDM[Double], Option[BDV[Double]], Long)) => {
    +      u._1 += v._1
    +      u._2.foreach(_ += v._2.get)
    +      (u._1, u._2, u._3 + v._3)
    +    }
    +
    +    val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN: Long) = stats
    +      .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))(
    +        elementWiseSum, elementWiseSum
    +      )
    +
    +    if (nonEmptyDocsN == 0) {
    --- End diff --
    
    I don't think that's the case here. But as long as all the cleanup work is done, I would not mind it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    So shall we ping @jkbradley, shan't we?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82506/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by WeichenXu123 <gi...@git.apache.org>.
Github user WeichenXu123 commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    ping @akopich This is an very useful improvement. Can you update the code while you're at it ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r140031900
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +462,46 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k))
    --- End diff --
    
    OK.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    **[Test build #82482 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82482/testReport)** for PR 18924 at commit [`68ca837`](https://github.com/apache/spark/commit/68ca837569f9a0ae5cac8ced7f35b13e2f941522).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `final class OnlineLDAOptimizer extends LDAOptimizer with Logging `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82482/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    **[Test build #82487 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82487/testReport)** for PR 18924 at commit [`2942082`](https://github.com/apache/spark/commit/294208217b0cbf0fe745b7a8c603ec2f5675f5dc).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Ping @jkbradley .
    
    Thank you @WeichenXu123 one again for the comment! Please, have a look.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r142833499
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // We calculate logphat in the same pass as other statistics, but we only need
    +    // it if we are optimizing docConcentration
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k))
    +                                      else None
     
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase()
    +      var nonEmptyDocCount : Long = 0L
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
    +        nonEmptyDocCount += 1
             val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    -    expElogbetaBc.destroy(false)
    -    val batchResult = statsSum *:* expElogbeta.t
    +      Iterator((stat, logphatPartOption, nonEmptyDocCount))
    +    }
    +
    +    val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
    +                                 v : (BDM[Double], Option[BDV[Double]], Long)) => {
    +      u._1 += v._1
    +      u._2.foreach(_ += v._2.get)
    +      (u._1, u._2, u._3 + v._3)
    +    }
    +
    +    val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats
    +      .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))(
    +        elementWiseSum, elementWiseSum
    +      )
     
    +    val batchResult = statsSum *:* expElogbeta.t
         // Note that this is an optimization to avoid batch.count
    -    updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
    -    if (optimizeDocConcentration) updateAlpha(gammat)
    +    val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
    +    updateLambda(batchResult, batchSize)
    +
    +    logphatOption.foreach(_ /= batchSize.toDouble)
    --- End diff --
    
    agree.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    **[Test build #82506 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82506/testReport)** for PR 18924 at commit [`a81dae5`](https://github.com/apache/spark/commit/a81dae574f2085ec390effd1b9b1962970f00239).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r140621444
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // We calculate logphat in the same pass as other statistics, but we only need
    +    // it if we are optimizing docConcentration
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k))
    +                                      else None
     
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Int)] = batch.mapPartitions { docs =>
    --- End diff --
    
    Let's use Long for the doc count since it could overflow for large datasets and miniBatchFraction


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82027/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    **[Test build #82505 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82505/testReport)** for PR 18924 at commit [`f181496`](https://github.com/apache/spark/commit/f1814965885e0c82a71287f5e5912e11b126b8a4).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r142624093
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // We calculate logphat in the same pass as other statistics, but we only need
    +    // it if we are optimizing docConcentration
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k))
    +                                      else None
    --- End diff --
    
    Thanks. Fixed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r143067455
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    -
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // If and only if optimizeDocConcentration is set true,
    +    // we calculate logphat in the same pass as other statistics.
    +    // No calculation of loghat happens otherwise.
    --- End diff --
    
    Maybe define logphatPartOptionBase as Option but not function.
    val logphatPartOptionBase = if (optimizeDocConcentration) {
                                             Some(BDV.zeros[Double](k))


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r142826326
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +462,54 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    -
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // If and only if optimizeDocConcentration is set true,
    +    // we calculate logphat in the same pass as other statistics.
    +    // No calculation of loghat happens otherwise.
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) {
    +                                          Some(BDV.zeros[Double](k))
    +                                        } else {
    +                                          None
    +                                        }
    +
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase()
    +      var nonEmptyDocCount : Long = 0L
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
    +        nonEmptyDocCount += 1
             val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    -    expElogbetaBc.destroy(false)
    -    val batchResult = statsSum *:* expElogbeta.t
    +      Iterator((stat, logphatPartOption, nonEmptyDocCount))
    +    }
    +
    +    val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
    +                          v : (BDM[Double], Option[BDV[Double]], Long)) => {
    +      u._1 += v._1
    +      u._2.foreach(_ += v._2.get)
    +      (u._1, u._2, u._3 + v._3)
    +    }
    +
    +    val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN: Long) = stats
    +      .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))(
    +        elementWiseSum, elementWiseSum
    +      )
     
    +    val batchResult = statsSum *:* expElogbeta.t
         // Note that this is an optimization to avoid batch.count
    -    updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
    -    if (optimizeDocConcentration) updateAlpha(gammat)
    +    val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
    +    updateLambda(batchResult, batchSize)
    +
    +    logphatOption.foreach(_ /= nonEmptyDocsN.toDouble)
    --- End diff --
    
    Good point about dividing by 0, @hhbyyh .  We should probably just check nonEmptyDocsN to see if it's 0, and if it is, skip all of these updates.  That's related to but actually separate from the follow-up SPARK-22111.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81885/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    **[Test build #82007 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82007/testReport)** for PR 18924 at commit [`b3875e8`](https://github.com/apache/spark/commit/b3875e849a0659e9c2e5db16b97f909a0451ecd6).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r140030582
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +462,46 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k))
    --- End diff --
    
    Add an inline doc note here:
    "We calculate logphat in the same pass as other statistics, but we only need it if we are optimizing docConcentration."


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    I have conducted some performance testing with random data. 
    
    The new implementation turns out to be notably faster. 
    
    ```
    OLD with hyper-parameter optimization  : 237 sec
    OLD w/o  hyper-parameter optimization  : 226 sec
    NEW with hyper-parameter optimization : 178 sec
    NEW w/o  hyper-parameter optimization : 171 sec
    ```
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r143057944
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    -
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // If and only if optimizeDocConcentration is set true,
    +    // we calculate logphat in the same pass as other statistics.
    +    // No calculation of loghat happens otherwise.
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) {
    +                                          Some(BDV.zeros[Double](k))
    +                                        } else {
    +                                          None
    +                                        }
    +
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase()
    +      var nonEmptyDocCount : Long = 0L
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
    +        nonEmptyDocCount += 1
             val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    -    expElogbetaBc.destroy(false)
    -    val batchResult = statsSum *:* expElogbeta.t
    +      Iterator((stat, logphatPartOption, nonEmptyDocCount))
    +    }
     
    +    val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
    +                          v : (BDM[Double], Option[BDV[Double]], Long)) => {
    +      u._1 += v._1
    +      u._2.foreach(_ += v._2.get)
    +      (u._1, u._2, u._3 + v._3)
    +    }
    +
    +    val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN: Long) = stats
    +      .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))(
    +        elementWiseSum, elementWiseSum
    +      )
    +
    +    if (nonEmptyDocsN == 0) {
    --- End diff --
    
    I would use 
    
    if (nonEmptyDocsN > 0) {
      update ...
    } else {
      logWarning...
    }
    this
    
    Just to avoid multiple exits.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    **[Test build #81893 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81893/testReport)** for PR 18924 at commit [`8e9bc73`](https://github.com/apache/spark/commit/8e9bc7308c0101131b6b1c6f7968c48514f18cce).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    **[Test build #82111 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82111/testReport)** for PR 18924 at commit [`1f9e650`](https://github.com/apache/spark/commit/1f9e650baab01c3b248d5b0225dbec1cde0c737d).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    **[Test build #81885 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81885/testReport)** for PR 18924 at commit [`9ce9655`](https://github.com/apache/spark/commit/9ce9655d2275454e016dd3f3b640e578f4b6e0e4).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r140032198
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -503,17 +518,15 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
       }
     
       /**
    -   * Update alpha based on `gammat`, the inferred topic distributions for documents in the
    -   * current mini-batch. Uses Newton-Rhapson method.
    +   * Update alpha based on `logphat`.
    --- End diff --
    
    I also rename `N` to `batchSize` which it is.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    @WeichenXu123, thanks for creating Jira. Yes, sure I will work on it. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r142831316
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // We calculate logphat in the same pass as other statistics, but we only need
    +    // it if we are optimizing docConcentration
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k))
    +                                      else None
     
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase()
    +      var nonEmptyDocCount : Long = 0L
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
    +        nonEmptyDocCount += 1
             val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    -    expElogbetaBc.destroy(false)
    -    val batchResult = statsSum *:* expElogbeta.t
    +      Iterator((stat, logphatPartOption, nonEmptyDocCount))
    +    }
    +
    +    val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
    +                                 v : (BDM[Double], Option[BDV[Double]], Long)) => {
    +      u._1 += v._1
    +      u._2.foreach(_ += v._2.get)
    +      (u._1, u._2, u._3 + v._3)
    +    }
    +
    +    val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats
    +      .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))(
    +        elementWiseSum, elementWiseSum
    +      )
     
    +    val batchResult = statsSum *:* expElogbeta.t
         // Note that this is an optimization to avoid batch.count
    -    updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
    -    if (optimizeDocConcentration) updateAlpha(gammat)
    +    val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
    --- End diff --
    
    this may wait.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r143069049
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    -
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // If and only if optimizeDocConcentration is set true,
    +    // we calculate logphat in the same pass as other statistics.
    +    // No calculation of loghat happens otherwise.
    --- End diff --
    
    About `logphatPartOptionBase`: tried that, initially and failed. This was discussed above with @WeichenXu123. The problem is caused by in-place modifications. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r142574222
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,36 +462,55 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // We calculate logphat in the same pass as other statistics, but we only need
    +    // it if we are optimizing docConcentration
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) Some(BDV.zeros[Double](k))
    +                                      else None
     
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase()
    +      var nonEmptyDocCount : Long = 0L
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
    +        nonEmptyDocCount += 1
             val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    -    expElogbetaBc.destroy(false)
    -    val batchResult = statsSum *:* expElogbeta.t
    +      Iterator((stat, logphatPartOption, nonEmptyDocCount))
    +    }
    +
    +    val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
    +                                 v : (BDM[Double], Option[BDV[Double]], Long)) => {
    +      u._1 += v._1
    +      u._2.foreach(_ += v._2.get)
    +      (u._1, u._2, u._3 + v._3)
    +    }
    +
    +    val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN : Long) = stats
    +      .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))(
    +        elementWiseSum, elementWiseSum
    +      )
     
    +    val batchResult = statsSum *:* expElogbeta.t
         // Note that this is an optimization to avoid batch.count
    -    updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
    -    if (optimizeDocConcentration) updateAlpha(gammat)
    +    val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
    +    updateLambda(batchResult, batchSize)
    +
    +    logphatOption.foreach(_ /= batchSize.toDouble)
    +    logphatOption.foreach(updateAlpha(_, nonEmptyDocsN))
    +
    +    expElogbetaBc.destroy(false)
    +
         this
       }
     
       /**
    -   * Update lambda based on the batch submitted. batchSize can be different for each iteration.
    +   * Update lambda based on the batch submitted. nonEmptyDocsN can be different for each iteration.
    --- End diff --
    
    comments should be consistent with code. Update code or revert comment change.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    **[Test build #80524 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/80524/testReport)** for PR 18924 at commit [`f81f1cd`](https://github.com/apache/spark/commit/f81f1cdcf6de1dafdc79c1801cc2e2f1f803f4cc).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    Thank you, @hhbyyh.
    
    I have augmented the example a bit: explicitly set random seed a nd chosen online optimizer:
    
    `val lda = new LDA().setK(10).setMaxIter(10).setOptimizer("online").setSeed(13)`
    
    But for some reason if I run it twice, the results are not the same. Is that expected? branch-2.2 was used.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    @WeichenXu123, yes sure. But can this wait until this PR is merged? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should n...

Posted by akopich <gi...@git.apache.org>.
Github user akopich commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r143159334
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    -
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // If and only if optimizeDocConcentration is set true,
    +    // we calculate logphat in the same pass as other statistics.
    +    // No calculation of loghat happens otherwise.
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) {
    +                                        Some(BDV.zeros[Double](k))
    +                                      } else {
    +                                        None
    +                                      }
    +
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase()
    +      var nonEmptyDocCount : Long = 0L
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
    +        nonEmptyDocCount += 1
             val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    +      Iterator((stat, logphatPartOption, nonEmptyDocCount))
    +    }
    +
    +    val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
    +                          v : (BDM[Double], Option[BDV[Double]], Long)) => {
    --- End diff --
    
    I see now. Thank you. But seems like the style guide suggests to move both of the parameters to the new line. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18924: [SPARK-14371] [MLLIB] OnlineLDAOptimizer should not coll...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18924
  
    **[Test build #82880 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82880/testReport)** for PR 18924 at commit [`a81dae5`](https://github.com/apache/spark/commit/a81dae574f2085ec390effd1b9b1962970f00239).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org