You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jk...@apache.org on 2016/04/06 20:36:30 UTC

spark git commit: [SPARK-14322][MLLIB] Use treeAggregate instead of reduce in OnlineLDAOptimizer

Repository: spark
Updated Branches:
  refs/heads/master db0b06c6e -> 8cffcb60d


[SPARK-14322][MLLIB] Use treeAggregate instead of reduce in OnlineLDAOptimizer

## What changes were proposed in this pull request?
jira: https://issues.apache.org/jira/browse/SPARK-14322

OnlineLDAOptimizer uses RDD.reduce in two places where it could use treeAggregate. This can cause scalability issues. This should be an easy fix.
This is also a bug since it modifies the first argument to reduce, so we should use aggregate or treeAggregate.
See this line: https://github.com/apache/spark/blob/f12f11e578169b47e3f8b18b299948c0670ba585/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala#L452
and a few lines below it.

## How was this patch tested?
unit tests

Author: Yuhao Yang <hh...@gmail.com>

Closes #12106 from hhbyyh/ldaTreeReduce.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8cffcb60
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8cffcb60
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8cffcb60

Branch: refs/heads/master
Commit: 8cffcb60deb82d04a5c6e144ec9927f6f7addc8b
Parents: db0b06c
Author: Yuhao Yang <hh...@gmail.com>
Authored: Wed Apr 6 11:36:26 2016 -0700
Committer: Joseph K. Bradley <jo...@databricks.com>
Committed: Wed Apr 6 11:36:26 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala  | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8cffcb60/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
index 7491ab0..2b404a8 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
@@ -451,10 +451,11 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
       }
       Iterator((stat, gammaPart))
     }
-    val statsSum: BDM[Double] = stats.map(_._1).reduce(_ += _)
+    val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
+      _ += _, _ += _)
     expElogbetaBc.unpersist()
     val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
-      stats.map(_._2).reduce(_ ++ _).map(_.toDenseMatrix): _*)
+      stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
     val batchResult = statsSum :* expElogbeta.t
 
     // Note that this is an optimization to avoid batch.count


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org