You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jk...@apache.org on 2015/07/22 22:06:05 UTC
spark git commit: [SPARK-9224] [MLLIB] OnlineLDA Performance
Improvements
Repository: spark
Updated Branches:
refs/heads/master e0b7ba59a -> 8486cd853
[SPARK-9224] [MLLIB] OnlineLDA Performance Improvements
In-place updates, reduce number of transposes, and vectorize operations in OnlineLDA implementation.
Author: Feynman Liang <fl...@databricks.com>
Closes #7454 from feynmanliang/OnlineLDA-perf-improvements and squashes the following commits:
78b0f5a [Feynman Liang] Make in-place variables vals, fix BLAS error
7f62a55 [Feynman Liang] --amend
c62cb1e [Feynman Liang] Outer product for stats, revert Range slicing
aead650 [Feynman Liang] Range slice, in-place update, reduce transposes
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8486cd85
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8486cd85
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8486cd85
Branch: refs/heads/master
Commit: 8486cd853104255b4eb013860bba793eef4e74e7
Parents: e0b7ba5
Author: Feynman Liang <fl...@databricks.com>
Authored: Wed Jul 22 13:06:01 2015 -0700
Committer: Joseph K. Bradley <jo...@databricks.com>
Committed: Wed Jul 22 13:06:01 2015 -0700
----------------------------------------------------------------------
.../spark/mllib/clustering/LDAOptimizer.scala | 59 +++++++++-----------
1 file changed, 27 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/8486cd85/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
index 8e5154b..b960ae6 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
@@ -19,15 +19,15 @@ package org.apache.spark.mllib.clustering
import java.util.Random
-import breeze.linalg.{DenseVector => BDV, DenseMatrix => BDM, sum, normalize, kron}
-import breeze.numerics.{digamma, exp, abs}
+import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, normalize, sum}
+import breeze.numerics.{abs, digamma, exp}
import breeze.stats.distributions.{Gamma, RandBasis}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.graphx._
import org.apache.spark.graphx.impl.GraphImpl
import org.apache.spark.mllib.impl.PeriodicGraphCheckpointer
-import org.apache.spark.mllib.linalg.{Matrices, SparseVector, DenseVector, Vector}
+import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseVector, Vector}
import org.apache.spark.rdd.RDD
/**
@@ -370,7 +370,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
iteration += 1
val k = this.k
val vocabSize = this.vocabSize
- val Elogbeta = dirichletExpectation(lambda)
+ val Elogbeta = dirichletExpectation(lambda).t
val expElogbeta = exp(Elogbeta)
val alpha = this.alpha
val gammaShape = this.gammaShape
@@ -385,41 +385,36 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
case v => throw new IllegalArgumentException("Online LDA does not support vector type "
+ v.getClass)
}
+ if (!ids.isEmpty) {
+
+ // Initialize the variational distribution q(theta|gamma) for the mini-batch
+ val gammad: BDV[Double] =
+ new Gamma(gammaShape, 1.0 / gammaShape).samplesVector(k) // K
+ val expElogthetad: BDV[Double] = exp(digamma(gammad) - digamma(sum(gammad))) // K
+ val expElogbetad: BDM[Double] = expElogbeta(ids, ::).toDenseMatrix // ids * K
+
+ val phinorm: BDV[Double] = expElogbetad * expElogthetad :+ 1e-100 // ids
+ var meanchange = 1D
+ val ctsVector = new BDV[Double](cts) // ids
+
+ // Iterate between gamma and phi until convergence
+ while (meanchange > 1e-3) {
+ val lastgamma = gammad.copy
+ // K K * ids ids
+ gammad := (expElogthetad :* (expElogbetad.t * (ctsVector :/ phinorm))) :+ alpha
+ expElogthetad := exp(digamma(gammad) - digamma(sum(gammad)))
+ phinorm := expElogbetad * expElogthetad :+ 1e-100
+ meanchange = sum(abs(gammad - lastgamma)) / k
+ }
- // Initialize the variational distribution q(theta|gamma) for the mini-batch
- var gammad = new Gamma(gammaShape, 1.0 / gammaShape).samplesVector(k).t // 1 * K
- var Elogthetad = digamma(gammad) - digamma(sum(gammad)) // 1 * K
- var expElogthetad = exp(Elogthetad) // 1 * K
- val expElogbetad = expElogbeta(::, ids).toDenseMatrix // K * ids
-
- var phinorm = expElogthetad * expElogbetad + 1e-100 // 1 * ids
- var meanchange = 1D
- val ctsVector = new BDV[Double](cts).t // 1 * ids
-
- // Iterate between gamma and phi until convergence
- while (meanchange > 1e-3) {
- val lastgamma = gammad
- // 1*K 1 * ids ids * k
- gammad = (expElogthetad :* ((ctsVector / phinorm) * expElogbetad.t)) + alpha
- Elogthetad = digamma(gammad) - digamma(sum(gammad))
- expElogthetad = exp(Elogthetad)
- phinorm = expElogthetad * expElogbetad + 1e-100
- meanchange = sum(abs(gammad - lastgamma)) / k
- }
-
- val m1 = expElogthetad.t
- val m2 = (ctsVector / phinorm).t.toDenseVector
- var i = 0
- while (i < ids.size) {
- stat(::, ids(i)) := stat(::, ids(i)) + m1 * m2(i)
- i += 1
+ stat(::, ids) := expElogthetad.asDenseMatrix.t * (ctsVector :/ phinorm).asDenseMatrix
}
}
Iterator(stat)
}
val statsSum: BDM[Double] = stats.reduce(_ += _)
- val batchResult = statsSum :* expElogbeta
+ val batchResult = statsSum :* expElogbeta.t
// Note that this is an optimization to avoid batch.count
update(batchResult, iteration, (miniBatchFraction * corpusSize).ceil.toInt)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org