You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jk...@apache.org on 2016/04/06 20:36:30 UTC
spark git commit: [SPARK-14322][MLLIB] Use treeAggregate instead of
reduce in OnlineLDAOptimizer
Repository: spark
Updated Branches:
refs/heads/master db0b06c6e -> 8cffcb60d
[SPARK-14322][MLLIB] Use treeAggregate instead of reduce in OnlineLDAOptimizer
## What changes were proposed in this pull request?
jira: https://issues.apache.org/jira/browse/SPARK-14322
OnlineLDAOptimizer uses RDD.reduce in two places where it could use treeAggregate. This can cause scalability issues. This should be an easy fix.
This is also a bug since it modifies the first argument to reduce, so we should use aggregate or treeAggregate.
See this line: https://github.com/apache/spark/blob/f12f11e578169b47e3f8b18b299948c0670ba585/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala#L452
and a few lines below it.
## How was this patch tested?
unit tests
Author: Yuhao Yang <hh...@gmail.com>
Closes #12106 from hhbyyh/ldaTreeReduce.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8cffcb60
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8cffcb60
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8cffcb60
Branch: refs/heads/master
Commit: 8cffcb60deb82d04a5c6e144ec9927f6f7addc8b
Parents: db0b06c
Author: Yuhao Yang <hh...@gmail.com>
Authored: Wed Apr 6 11:36:26 2016 -0700
Committer: Joseph K. Bradley <jo...@databricks.com>
Committed: Wed Apr 6 11:36:26 2016 -0700
----------------------------------------------------------------------
.../scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/8cffcb60/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
index 7491ab0..2b404a8 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
@@ -451,10 +451,11 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
}
Iterator((stat, gammaPart))
}
- val statsSum: BDM[Double] = stats.map(_._1).reduce(_ += _)
+ val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
+ _ += _, _ += _)
expElogbetaBc.unpersist()
val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
- stats.map(_._2).reduce(_ ++ _).map(_.toDenseMatrix): _*)
+ stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
val batchResult = statsSum :* expElogbeta.t
// Note that this is an optimization to avoid batch.count
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org