You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jk...@apache.org on 2015/07/31 20:50:21 UTC

spark git commit: [SPARK-9231] [MLLIB] DistributedLDAModel method for top topics per document

Repository: spark
Updated Branches:
  refs/heads/master 6add4eddb -> 4011a9471


[SPARK-9231] [MLLIB] DistributedLDAModel method for top topics per document

jira: https://issues.apache.org/jira/browse/SPARK-9231

Helper method in DistributedLDAModel of this form:
```
/**
 * For each document, return the top k weighted topics for that document.
 * return RDD of (doc ID, topic indices, topic weights)
 */
def topTopicsPerDocument(k: Int): RDD[(Long, Array[Int], Array[Double])]
```

Author: Yuhao Yang <hh...@gmail.com>

Closes #7785 from hhbyyh/topTopicsPerdoc and squashes the following commits:

30ad153 [Yuhao Yang] small fix
fd24580 [Yuhao Yang] add topTopics per document to DistributedLDAModel


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4011a947
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4011a947
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4011a947

Branch: refs/heads/master
Commit: 4011a947154d97a9ffb5a71f077481a12534d36b
Parents: 6add4ed
Author: Yuhao Yang <hh...@gmail.com>
Authored: Fri Jul 31 11:50:15 2015 -0700
Committer: Joseph K. Bradley <jo...@databricks.com>
Committed: Fri Jul 31 11:50:15 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/mllib/clustering/LDAModel.scala | 19 ++++++++++++++++++-
 .../apache/spark/mllib/clustering/LDASuite.scala | 13 ++++++++++++-
 2 files changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4011a947/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
index 6cfad3f..82281a0 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.mllib.clustering
 
-import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, normalize, sum}
+import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argtopk, normalize, sum}
 import breeze.numerics.{exp, lgamma}
 import org.apache.hadoop.fs.Path
 import org.json4s.DefaultFormats
@@ -591,6 +591,23 @@ class DistributedLDAModel private[clustering] (
     JavaPairRDD.fromRDD(topicDistributions.asInstanceOf[RDD[(java.lang.Long, Vector)]])
   }
 
+  /**
+   * For each document, return the top k weighted topics for that document and their weights.
+   * @return RDD of (doc ID, topic indices, topic weights)
+   */
+  def topTopicsPerDocument(k: Int): RDD[(Long, Array[Int], Array[Double])] = {
+    graph.vertices.filter(LDA.isDocumentVertex).map { case (docID, topicCounts) =>
+      val topIndices = argtopk(topicCounts, k)
+      val sumCounts = sum(topicCounts)
+      val weights = if (sumCounts != 0) {
+        topicCounts(topIndices) / sumCounts
+      } else {
+        topicCounts(topIndices)
+      }
+      (docID.toLong, topIndices.toArray, weights.toArray)
+    }
+  }
+
   // TODO:
   // override def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = ???
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4011a947/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
index c43e1e5..695ee3b 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.mllib.clustering
 
-import breeze.linalg.{DenseMatrix => BDM, max, argmax}
+import breeze.linalg.{DenseMatrix => BDM, argtopk, max, argmax}
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.graphx.Edge
@@ -108,6 +108,17 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext {
       assert(topicDistribution.toArray.sum ~== 1.0 absTol 1e-5)
     }
 
+    val top2TopicsPerDoc = model.topTopicsPerDocument(2).map(t => (t._1, (t._2, t._3)))
+    model.topicDistributions.join(top2TopicsPerDoc).collect().foreach {
+      case (docId, (topicDistribution, (indices, weights))) =>
+        assert(indices.length == 2)
+        assert(weights.length == 2)
+        val bdvTopicDist = topicDistribution.toBreeze
+        val top2Indices = argtopk(bdvTopicDist, 2)
+        assert(top2Indices.toArray === indices)
+        assert(bdvTopicDist(top2Indices).toArray === weights)
+    }
+
     // Check: log probabilities
     assert(model.logLikelihood < 0.0)
     assert(model.logPrior < 0.0)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org