You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jk...@apache.org on 2017/10/18 17:46:54 UTC
spark git commit: [SPARK-14371][MLLIB] OnlineLDAOptimizer should not
collect stats for each doc in mini-batch to driver
Repository: spark
Updated Branches:
refs/heads/master 1f25d8683 -> 52facb006
[SPARK-14371][MLLIB] OnlineLDAOptimizer should not collect stats for each doc in mini-batch to driver
Hi,
# What changes were proposed in this pull request?
as it was proposed by jkbradley , ```gammat``` are not collected to the driver anymore.
# How was this patch tested?
existing test suite.
Author: Valeriy Avanesov <av...@wias-berlin.de>
Author: Valeriy Avanesov <ac...@gmail.com>
Closes #18924 from akopich/master.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/52facb00
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/52facb00
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/52facb00
Branch: refs/heads/master
Commit: 52facb0062a4253fa45ac0c633d0510a9b684a62
Parents: 1f25d86
Author: Valeriy Avanesov <av...@wias-berlin.de>
Authored: Wed Oct 18 10:46:46 2017 -0700
Committer: Joseph K. Bradley <jo...@databricks.com>
Committed: Wed Oct 18 10:46:46 2017 -0700
----------------------------------------------------------------------
.../spark/mllib/clustering/LDAOptimizer.scala | 82 ++++++++++++++------
1 file changed, 57 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/52facb00/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
index d633893..693a2a3 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
@@ -26,6 +26,7 @@ import breeze.stats.distributions.{Gamma, RandBasis}
import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.PeriodicGraphCheckpointer
+import org.apache.spark.internal.Logging
import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseVector, Vector, Vectors}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
@@ -259,7 +260,7 @@ final class EMLDAOptimizer extends LDAOptimizer {
*/
@Since("1.4.0")
@DeveloperApi
-final class OnlineLDAOptimizer extends LDAOptimizer {
+final class OnlineLDAOptimizer extends LDAOptimizer with Logging {
// LDA common parameters
private var k: Int = 0
@@ -462,31 +463,61 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
val alpha = this.alpha.asBreeze
val gammaShape = this.gammaShape
-
- val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
+ val optimizeDocConcentration = this.optimizeDocConcentration
+ // If and only if optimizeDocConcentration is set true,
+ // we calculate logphat in the same pass as other statistics.
+ // No calculation of loghat happens otherwise.
+ val logphatPartOptionBase = () => if (optimizeDocConcentration) {
+ Some(BDV.zeros[Double](k))
+ } else {
+ None
+ }
+
+ val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = batch.mapPartitions { docs =>
val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
val stat = BDM.zeros[Double](k, vocabSize)
- var gammaPart = List[BDV[Double]]()
+ val logphatPartOption = logphatPartOptionBase()
+ var nonEmptyDocCount: Long = 0L
nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
+ nonEmptyDocCount += 1
val (gammad, sstats, ids) = OnlineLDAOptimizer.variationalTopicInference(
termCounts, expElogbetaBc.value, alpha, gammaShape, k)
- stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
- gammaPart = gammad :: gammaPart
+ stat(::, ids) := stat(::, ids) + sstats
+ logphatPartOption.foreach(_ += LDAUtils.dirichletExpectation(gammad))
}
- Iterator((stat, gammaPart))
- }.persist(StorageLevel.MEMORY_AND_DISK)
- val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
- _ += _, _ += _)
- val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
- stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
- stats.unpersist()
+ Iterator((stat, logphatPartOption, nonEmptyDocCount))
+ }
+
+ val elementWiseSum = (
+ u: (BDM[Double], Option[BDV[Double]], Long),
+ v: (BDM[Double], Option[BDV[Double]], Long)) => {
+ u._1 += v._1
+ u._2.foreach(_ += v._2.get)
+ (u._1, u._2, u._3 + v._3)
+ }
+
+ val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], nonEmptyDocsN: Long) = stats
+ .treeAggregate((BDM.zeros[Double](k, vocabSize), logphatPartOptionBase(), 0L))(
+ elementWiseSum, elementWiseSum
+ )
+
expElogbetaBc.destroy(false)
- val batchResult = statsSum *:* expElogbeta.t
+ if (nonEmptyDocsN == 0) {
+ logWarning("No non-empty documents were submitted in the batch.")
+ // Therefore, there is no need to update any of the model parameters
+ return this
+ }
+
+ val batchResult = statsSum *:* expElogbeta.t
// Note that this is an optimization to avoid batch.count
- updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)
- if (optimizeDocConcentration) updateAlpha(gammat)
+ val batchSize = (miniBatchFraction * corpusSize).ceil.toInt
+ updateLambda(batchResult, batchSize)
+
+ logphatOption.foreach(_ /= nonEmptyDocsN.toDouble)
+ logphatOption.foreach(updateAlpha(_, nonEmptyDocsN))
+
this
}
@@ -503,21 +534,22 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
}
/**
- * Update alpha based on `gammat`, the inferred topic distributions for documents in the
- * current mini-batch. Uses Newton-Rhapson method.
+ * Update alpha based on `logphat`.
+ * Uses Newton-Rhapson method.
* @see Section 3.3, Huang: Maximum Likelihood Estimation of Dirichlet Distribution Parameters
* (http://jonathan-huang.org/research/dirichlet/dirichlet.pdf)
+ * @param logphat Expectation of estimated log-posterior distribution of
+ * topics in a document averaged over the batch.
+ * @param nonEmptyDocsN number of non-empty documents
*/
- private def updateAlpha(gammat: BDM[Double]): Unit = {
+ private def updateAlpha(logphat: BDV[Double], nonEmptyDocsN: Double): Unit = {
val weight = rho()
- val N = gammat.rows.toDouble
val alpha = this.alpha.asBreeze.toDenseVector
- val logphat: BDV[Double] =
- sum(LDAUtils.dirichletExpectation(gammat)(::, breeze.linalg.*)).t / N
- val gradf = N * (-LDAUtils.dirichletExpectation(alpha) + logphat)
- val c = N * trigamma(sum(alpha))
- val q = -N * trigamma(alpha)
+ val gradf = nonEmptyDocsN * (-LDAUtils.dirichletExpectation(alpha) + logphat)
+
+ val c = nonEmptyDocsN * trigamma(sum(alpha))
+ val q = -nonEmptyDocsN * trigamma(alpha)
val b = sum(gradf / q) / (1D / c + sum(1D / q))
val dalpha = -(gradf - b) / q
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org