You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by yinxusen <gi...@git.apache.org> on 2014/04/22 07:26:34 UTC

[GitHub] spark pull request: JIRA issue: [SPARK-1405](https://issues.apache...

GitHub user yinxusen opened a pull request:

    https://github.com/apache/spark/pull/476

    JIRA issue: [SPARK-1405](https://issues.apache.org/jira/browse/SPARK-1405) Gibbs sampling based Latent Dirichlet Allocation (LDA) for MLlib

    (This PR is based on a joint work done with @liancheng four months ago.)
    
    ## Overview
    
    LDA is a classical topic model in machine learning, that provides the ability to extract topics from corpus. Gibbs sampling (GS for short) is a common way to optimize LDA model.
    
    The LDA model consists of four matrices, two 1-dim matrices:
    
    * Document counts
    * Topic counts
    
    plus two 2-dim matrices:
    
    * Document-Topic counts
    * Topic-Term counts
    
    ## Implementation details
    
    * An accumulator is used to aggregate all updated values and applies them on the old model computed in the last iteration.
    
    * [Chalk](https://github.com/scalanlp/chalk) is used for term segmentation. Though it is easy to rewrite it with Lucene analyzers, I think MLlib should not take the burden to maintain an implementation of tokenizer.
    
    * `SparkContext.wholeTextFiles()` is convenient for offline experimentation, while `SparkContext.textFile()` is better for online applications.
    
    * Document dictionary and term dictionary are broadcasted to translate document names and terms into `Int` IDs.
    
    * Topic assignment matrix from the last iteration is cached for the current iteration, and then unpersisted to release memory.
    
    * LDA suffers similar stack overflow problem of MLlib ALS ([SPARK-1006](https://spark-project.atlassian.net/browse/SPARK-1006)). To workaround this issue, we checkpoint every a few iterations.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/yinxusen/spark lda

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/476.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #476
    
----
commit 1f8793af562163e251d593c0f5118dea9176d7b5
Author: Xusen Yin <yi...@gmail.com>
Date:   2014-04-22T01:45:52Z

    initial commit

commit e137287f5cd73290fa558c07ededbeb091eda215
Author: Xusen Yin <yi...@gmail.com>
Date:   2014-04-22T01:48:52Z

    fix import style

commit 7378cff8eae01a726a1ea53b21db5f6972d6f14e
Author: Xusen Yin <yi...@gmail.com>
Date:   2014-04-22T01:52:30Z

    ready for PR

commit 063ff0fa607e956923dda32ebfcb4629583867d5
Author: Cheng Lian <li...@gmail.com>
Date:   2014-04-22T02:32:36Z

    Code cleanup

commit 45b157edfa4e6444809daca9b0b2d57e2b575e4b
Author: Xusen Yin <yi...@gmail.com>
Date:   2014-04-22T05:06:56Z

    fix minor error

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...

Posted by etrain <gi...@git.apache.org>.
Github user etrain commented on a diff in the pull request:

    https://github.com/apache/spark/pull/476#discussion_r12126094
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala ---
    @@ -0,0 +1,169 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import java.util.Random
    +
    +import breeze.linalg.{DenseVector => BDV}
    +
    +import org.apache.spark.{AccumulableParam, Logging, SparkContext}
    +import org.apache.spark.mllib.expectation.GibbsSampling
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.mllib.util.MLUtils
    +import org.apache.spark.rdd.RDD
    +
    +case class Document(docId: Int, content: Iterable[Int])
    +
    +case class LDAParams (
    +    docCounts: Vector,
    +    topicCounts: Vector,
    +    docTopicCounts: Array[Vector],
    +    topicTermCounts: Array[Vector])
    +  extends Serializable {
    +
    +  def update(docId: Int, term: Int, topic: Int, inc: Int) = {
    +    docCounts.toBreeze(docId) += inc
    +    topicCounts.toBreeze(topic) += inc
    +    docTopicCounts(docId).toBreeze(topic) += inc
    +    topicTermCounts(topic).toBreeze(term) += inc
    +    this
    +  }
    +
    +  def merge(other: LDAParams) = {
    +    docCounts.toBreeze += other.docCounts.toBreeze
    +    topicCounts.toBreeze += other.topicCounts.toBreeze
    +
    +    var i = 0
    +    while (i < docTopicCounts.length) {
    +      docTopicCounts(i).toBreeze += other.docTopicCounts(i).toBreeze
    --- End diff --
    
    more breeze conversion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...

Posted by jegonzal <gi...@git.apache.org>.
Github user jegonzal commented on the pull request:

    https://github.com/apache/spark/pull/476#issuecomment-41755910
  
    I would be happy to talk more about this after the OSDI deadline.  As far as storing the model (or more precisely the counts and samples) as an a RDD, I think this really is necessary.  The model in this case should be on the order of the size of the data.  
    
    Essentially what you want is the ability to join the term topic counts with the document topic counts for each token in a given document.  Given these two counts tables (along with the background distribution of topics in the entire corpus) you can compute the new topic assignment.  
    
    Here is an implementation of the collapsed Gibbs sampler for LDA using GraphX:  https://github.com/amplab/graphx/pull/113
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...

Posted by etrain <gi...@git.apache.org>.
Github user etrain commented on a diff in the pull request:

    https://github.com/apache/spark/pull/476#discussion_r12126093
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala ---
    @@ -0,0 +1,169 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import java.util.Random
    +
    +import breeze.linalg.{DenseVector => BDV}
    +
    +import org.apache.spark.{AccumulableParam, Logging, SparkContext}
    +import org.apache.spark.mllib.expectation.GibbsSampling
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.mllib.util.MLUtils
    +import org.apache.spark.rdd.RDD
    +
    +case class Document(docId: Int, content: Iterable[Int])
    +
    +case class LDAParams (
    +    docCounts: Vector,
    +    topicCounts: Vector,
    +    docTopicCounts: Array[Vector],
    +    topicTermCounts: Array[Vector])
    +  extends Serializable {
    +
    +  def update(docId: Int, term: Int, topic: Int, inc: Int) = {
    +    docCounts.toBreeze(docId) += inc
    --- End diff --
    
    Doing the breeze conversion on every update seems inefficient. These variables should be private and created as breeze variables at initialization, only the user facing APIs need to be Vector, Array[Vector], etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...

Posted by etrain <gi...@git.apache.org>.
Github user etrain commented on a diff in the pull request:

    https://github.com/apache/spark/pull/476#discussion_r12125916
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala ---
    @@ -0,0 +1,169 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import java.util.Random
    +
    +import breeze.linalg.{DenseVector => BDV}
    +
    +import org.apache.spark.{AccumulableParam, Logging, SparkContext}
    +import org.apache.spark.mllib.expectation.GibbsSampling
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.mllib.util.MLUtils
    +import org.apache.spark.rdd.RDD
    +
    +case class Document(docId: Int, content: Iterable[Int])
    +
    +case class LDAParams (
    +    docCounts: Vector,
    +    topicCounts: Vector,
    +    docTopicCounts: Array[Vector],
    +    topicTermCounts: Array[Vector])
    --- End diff --
    
    I expect that this will be *really* big - maybe the last two variables should be RDDs - similar to what we do with ALS.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...

Posted by etrain <gi...@git.apache.org>.
Github user etrain commented on a diff in the pull request:

    https://github.com/apache/spark/pull/476#discussion_r12126271
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala ---
    @@ -20,15 +20,17 @@ package org.apache.spark.mllib.util
     import scala.reflect.ClassTag
     
     import breeze.linalg.{Vector => BV, SparseVector => BSV, squaredDistance => breezeSquaredDistance}
    +import breeze.util.Index
    +import chalk.text.tokenize.JavaWordTokenizer
    --- End diff --
    
    I think chalk for tokenization is probably a fine choice for starters - but can you say what it buys us over regular scala StringTokenizer?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/476#issuecomment-41005119
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...

Posted by etrain <gi...@git.apache.org>.
Github user etrain commented on the pull request:

    https://github.com/apache/spark/pull/476#issuecomment-41753574
  
    Before I get too deep into this review - I want to step back and think about whether we expect the model in this case to be on the order of the size of the data - I think it is, and if so, we may want to consider representing the model as RDD[DocumentTopicFeatures] and RDD[TopicWordFeatures], similar to what we do with ALS. This may change the algorithm substantially.
    
    Separately, maybe it makes sense to have a concrete use case to work with (reuters dataset or something) so that we can evaluate how much memory actually gets used given a reasonably sized corpus.
    
    Perhaps @mengxr or @jegonzal has a strong opinion on this. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/476#discussion_r12127214
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala ---
    @@ -0,0 +1,169 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import java.util.Random
    +
    +import breeze.linalg.{DenseVector => BDV}
    +
    +import org.apache.spark.{AccumulableParam, Logging, SparkContext}
    +import org.apache.spark.mllib.expectation.GibbsSampling
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.mllib.util.MLUtils
    +import org.apache.spark.rdd.RDD
    +
    +case class Document(docId: Int, content: Iterable[Int])
    --- End diff --
    
    You mean "termId: count" ? Yes it is a common way to do that. But I just consider the trade-off between statistical efficiency and hardware efficiency. If we combine the same term together in one document, it seems that the randomness is worse. Anyway, I'll try to modified it using SparseVector.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/476#discussion_r15742427
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala ---
    @@ -233,4 +235,60 @@ object MLUtils {
         }
         sqDist
       }
    +
    +/**
    +   * Load corpus from a given path. Terms and documents will be translated into integers, with a
    +   * term-integer map and a document-integer map.
    +   *
    +   * @param dir The path of corpus.
    +   * @param dirStopWords The path of stop words.
    +   * @return (RDD[Document], Term-integer map, doc-integer map)
    +   */
    +  def loadCorpus(
    +      sc: SparkContext,
    +      dir: String,
    +      minSplits: Int,
    +      dirStopWords: String = ""):
    +  (RDD[Document], Index[String], Index[String]) = {
    +
    +    // Containers and indexers for terms and documents
    +    val termMap = Index[String]()
    +    val docMap = Index[String]()
    +
    +    val stopWords =
    +      if (dirStopWords == "") {
    +        Set.empty[String]
    +      }
    +      else {
    +        sc.textFile(dirStopWords, minSplits).
    +          map(x => x.replaceAll( """(?m)\s+$""", "")).distinct().collect().toSet
    +      }
    +    val broadcastStopWord = sc.broadcast(stopWords)
    +
    +    // Tokenize and filter terms
    +    val almostData = sc.wholeTextFiles(dir, minSplits).map { case (fileName, content) =>
    +      val tokens = JavaWordTokenizer(content)
    --- End diff --
    
    We should allow users to customize here.  We can add a parameter `tokenizer: (String) => Iterable[String]`, to  loadCorpus, and `dirStopWords ` is not required.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/476


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/476#issuecomment-41006725
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/476#discussion_r12127051
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala ---
    @@ -0,0 +1,169 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import java.util.Random
    +
    +import breeze.linalg.{DenseVector => BDV}
    +
    +import org.apache.spark.{AccumulableParam, Logging, SparkContext}
    +import org.apache.spark.mllib.expectation.GibbsSampling
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.mllib.util.MLUtils
    +import org.apache.spark.rdd.RDD
    +
    +case class Document(docId: Int, content: Iterable[Int])
    +
    +case class LDAParams (
    +    docCounts: Vector,
    +    topicCounts: Vector,
    +    docTopicCounts: Array[Vector],
    +    topicTermCounts: Array[Vector])
    --- End diff --
    
    That's make sense. I think the `docTopicCounts` could be sliced easily W.R.T. documents partitions. But for `topicTermCounts`, it's hard to do slice. I'll find a way to settle it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on the pull request:

    https://github.com/apache/spark/pull/476#issuecomment-41772255
  
    Yep, thanks @jegonzal and @etrain , I'll try to fix these issues and look forward to the next step updating and discussion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/476#issuecomment-41006728
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14318/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/476#issuecomment-41005124
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...

Posted by etrain <gi...@git.apache.org>.
Github user etrain commented on a diff in the pull request:

    https://github.com/apache/spark/pull/476#discussion_r12126088
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala ---
    @@ -0,0 +1,169 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import java.util.Random
    +
    +import breeze.linalg.{DenseVector => BDV}
    +
    +import org.apache.spark.{AccumulableParam, Logging, SparkContext}
    +import org.apache.spark.mllib.expectation.GibbsSampling
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.mllib.util.MLUtils
    +import org.apache.spark.rdd.RDD
    +
    +case class Document(docId: Int, content: Iterable[Int])
    +
    +case class LDAParams (
    +    docCounts: Vector,
    +    topicCounts: Vector,
    +    docTopicCounts: Array[Vector],
    +    topicTermCounts: Array[Vector])
    +  extends Serializable {
    +
    +  def update(docId: Int, term: Int, topic: Int, inc: Int) = {
    +    docCounts.toBreeze(docId) += inc
    +    topicCounts.toBreeze(topic) += inc
    +    docTopicCounts(docId).toBreeze(topic) += inc
    --- End diff --
    
    Again, I think in this case the *model* might be really big - e.g. a billion documents in hundreds of topics. Or for the term side, millions of words in a vocabulary and hundreds of topics.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...

Posted by etrain <gi...@git.apache.org>.
Github user etrain commented on a diff in the pull request:

    https://github.com/apache/spark/pull/476#discussion_r12126176
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/expectation/GibbsSampling.scala ---
    @@ -0,0 +1,219 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.expectation
    +
    +import java.util.Random
    +
    +import breeze.linalg.{DenseVector => BDV, sum}
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.mllib.clustering.{Document, LDAParams}
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +
    +/**
    + * Gibbs sampling from a given dataset and org.apache.spark.mllib.model.
    + * @param data Dataset, such as corpus.
    + * @param numOuterIterations Number of outer iteration.
    + * @param numInnerIterations Number of inner iteration, used in each partition.
    + * @param docTopicSmoothing Document-topic smoothing.
    + * @param topicTermSmoothing Topic-term smoothing.
    + */
    +class GibbsSampling(
    --- End diff --
    
    Gibbs Sampling is a very useful general purpose tool to have. It's interface should be something more generic than RDD[Document], and the parameters should be amenable to domains other than text.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/476#issuecomment-56990648
  
    @yinxusen Per discussion on https://issues.apache.org/jira/browse/SPARK-1405, we want to have a GraphX-based implementation and distributed representation of the topic model. Do you mind closing this PR? Thanks for your contribution and @etrain @jegonzal and @witgo for code review! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...

Posted by etrain <gi...@git.apache.org>.
Github user etrain commented on a diff in the pull request:

    https://github.com/apache/spark/pull/476#discussion_r12126106
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala ---
    @@ -0,0 +1,169 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import java.util.Random
    +
    +import breeze.linalg.{DenseVector => BDV}
    +
    +import org.apache.spark.{AccumulableParam, Logging, SparkContext}
    +import org.apache.spark.mllib.expectation.GibbsSampling
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.mllib.util.MLUtils
    +import org.apache.spark.rdd.RDD
    +
    +case class Document(docId: Int, content: Iterable[Int])
    +
    +case class LDAParams (
    +    docCounts: Vector,
    +    topicCounts: Vector,
    +    docTopicCounts: Array[Vector],
    +    topicTermCounts: Array[Vector])
    +  extends Serializable {
    +
    +  def update(docId: Int, term: Int, topic: Int, inc: Int) = {
    +    docCounts.toBreeze(docId) += inc
    +    topicCounts.toBreeze(topic) += inc
    +    docTopicCounts(docId).toBreeze(topic) += inc
    +    topicTermCounts(topic).toBreeze(term) += inc
    +    this
    +  }
    +
    +  def merge(other: LDAParams) = {
    +    docCounts.toBreeze += other.docCounts.toBreeze
    +    topicCounts.toBreeze += other.topicCounts.toBreeze
    +
    +    var i = 0
    +    while (i < docTopicCounts.length) {
    +      docTopicCounts(i).toBreeze += other.docTopicCounts(i).toBreeze
    +      i += 1
    +    }
    +
    +    i = 0
    +    while (i < topicTermCounts.length) {
    +      topicTermCounts(i).toBreeze += other.topicTermCounts(i).toBreeze
    +      i += 1
    +    }
    +    this
    +  }
    +
    +  /**
    +   * This function used for computing the new distribution after drop one from current document,
    +   * which is a really essential part of Gibbs sampling for LDA, you can refer to the paper:
    +   * <I>Parameter estimation for text analysis</I>
    --- End diff --
    
    Link to paper, please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/476#discussion_r12132841
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/expectation/GibbsSampling.scala ---
    @@ -0,0 +1,219 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.expectation
    +
    +import java.util.Random
    +
    +import breeze.linalg.{DenseVector => BDV, sum}
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.mllib.clustering.{Document, LDAParams}
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +
    +/**
    + * Gibbs sampling from a given dataset and org.apache.spark.mllib.model.
    + * @param data Dataset, such as corpus.
    + * @param numOuterIterations Number of outer iteration.
    + * @param numInnerIterations Number of inner iteration, used in each partition.
    + * @param docTopicSmoothing Document-topic smoothing.
    + * @param topicTermSmoothing Topic-term smoothing.
    + */
    +class GibbsSampling(
    +    data: RDD[Document],
    +    numOuterIterations: Int,
    +    numInnerIterations: Int,
    +    docTopicSmoothing: Double,
    +    topicTermSmoothing: Double)
    +  extends Logging with Serializable {
    +
    +  import GibbsSampling._
    +
    +  /**
    +   * Main function of running a Gibbs sampling method. It contains two phases of total Gibbs
    +   * sampling: first is initialization, second is real sampling.
    +   */
    +  def runGibbsSampling(
    +      initParams: LDAParams,
    +      data: RDD[Document] = data,
    +      numOuterIterations: Int = numOuterIterations,
    +      numInnerIterations: Int = numInnerIterations,
    +      docTopicSmoothing: Double = docTopicSmoothing,
    +      topicTermSmoothing: Double = topicTermSmoothing): LDAParams = {
    +
    +    val numTerms = initParams.topicTermCounts.head.size
    +    val numDocs = initParams.docCounts.size
    +    val numTopics = initParams.topicCounts.size
    +
    +    // Construct topic assignment RDD
    +    logInfo("Start initialization")
    +
    +    val cpInterval = System.getProperty("spark.gibbsSampling.checkPointInterval", "10").toInt
    +    val sc = data.context
    +    val (initialParams, initialChosenTopics) = sampleTermAssignment(initParams, data)
    +
    +    // Gibbs sampling
    +    val (params, _, _) = Iterator.iterate((sc.accumulable(initialParams), initialChosenTopics, 0)) {
    --- End diff --
    
    Accumulator has the ability to do the fine-grained updating for LDA parameters. For aggregate, I have to use `mapPartitions` as shown in my previous version of LDA impl [here](https://github.com/yinxusen/lda_spark/blob/mappartition/src/main/scala/org.apache.spark.mllib.expectation/GibbsSampling.scala#L77). However, the previous impl is slower than current version, partly because the serialization of the huge parameters.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...

Posted by etrain <gi...@git.apache.org>.
Github user etrain commented on a diff in the pull request:

    https://github.com/apache/spark/pull/476#discussion_r12126247
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/expectation/GibbsSampling.scala ---
    @@ -0,0 +1,219 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.expectation
    +
    +import java.util.Random
    +
    +import breeze.linalg.{DenseVector => BDV, sum}
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.mllib.clustering.{Document, LDAParams}
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +
    +/**
    + * Gibbs sampling from a given dataset and org.apache.spark.mllib.model.
    + * @param data Dataset, such as corpus.
    + * @param numOuterIterations Number of outer iteration.
    + * @param numInnerIterations Number of inner iteration, used in each partition.
    + * @param docTopicSmoothing Document-topic smoothing.
    + * @param topicTermSmoothing Topic-term smoothing.
    + */
    +class GibbsSampling(
    +    data: RDD[Document],
    +    numOuterIterations: Int,
    +    numInnerIterations: Int,
    +    docTopicSmoothing: Double,
    +    topicTermSmoothing: Double)
    +  extends Logging with Serializable {
    +
    +  import GibbsSampling._
    +
    +  /**
    +   * Main function of running a Gibbs sampling method. It contains two phases of total Gibbs
    +   * sampling: first is initialization, second is real sampling.
    +   */
    +  def runGibbsSampling(
    +      initParams: LDAParams,
    +      data: RDD[Document] = data,
    +      numOuterIterations: Int = numOuterIterations,
    +      numInnerIterations: Int = numInnerIterations,
    +      docTopicSmoothing: Double = docTopicSmoothing,
    +      topicTermSmoothing: Double = topicTermSmoothing): LDAParams = {
    +
    +    val numTerms = initParams.topicTermCounts.head.size
    +    val numDocs = initParams.docCounts.size
    +    val numTopics = initParams.topicCounts.size
    +
    +    // Construct topic assignment RDD
    +    logInfo("Start initialization")
    +
    +    val cpInterval = System.getProperty("spark.gibbsSampling.checkPointInterval", "10").toInt
    +    val sc = data.context
    +    val (initialParams, initialChosenTopics) = sampleTermAssignment(initParams, data)
    +
    +    // Gibbs sampling
    +    val (params, _, _) = Iterator.iterate((sc.accumulable(initialParams), initialChosenTopics, 0)) {
    +      case (lastParams, lastChosenTopics, i) =>
    +        logInfo("Start Gibbs sampling")
    +
    +        val rand = new Random(42 + i * i)
    +        val params = sc.accumulable(LDAParams(numDocs, numTopics, numTerms))
    +        val chosenTopics = data.zip(lastChosenTopics).map {
    +          case (Document(docId, content), topics) =>
    +            content.zip(topics).map { case (term, topic) =>
    +              lastParams += (docId, term, topic, -1)
    +
    +              val chosenTopic = lastParams.localValue.dropOneDistSampler(
    +                docTopicSmoothing, topicTermSmoothing, term, docId, rand)
    +
    +              lastParams += (docId, term, chosenTopic, 1)
    +              params += (docId, term, chosenTopic, 1)
    +
    +              chosenTopic
    +            }
    +        }.cache()
    +
    +        if (i + 1 % cpInterval == 0) {
    +          chosenTopics.checkpoint()
    +        }
    +
    +        // Trigger a job to collect accumulable LDA parameters.
    +        chosenTopics.count()
    +        lastChosenTopics.unpersist()
    +
    +        (params, chosenTopics, i + 1)
    +    }.drop(1 + numOuterIterations).next()
    +
    +    params.value
    +  }
    +
    +  /**
    +   * Model matrix Phi and Theta are inferred via LDAParams.
    +   */
    +  def solvePhiAndTheta(
    +      params: LDAParams,
    +      docTopicSmoothing: Double = docTopicSmoothing,
    +      topicTermSmoothing: Double = topicTermSmoothing): (Array[Vector], Array[Vector]) = {
    --- End diff --
    
    Again, Phi and Theta might be too big.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...

Posted by etrain <gi...@git.apache.org>.
Github user etrain commented on a diff in the pull request:

    https://github.com/apache/spark/pull/476#discussion_r12125991
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala ---
    @@ -0,0 +1,169 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import java.util.Random
    +
    +import breeze.linalg.{DenseVector => BDV}
    +
    +import org.apache.spark.{AccumulableParam, Logging, SparkContext}
    +import org.apache.spark.mllib.expectation.GibbsSampling
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.mllib.util.MLUtils
    +import org.apache.spark.rdd.RDD
    +
    +case class Document(docId: Int, content: Iterable[Int])
    --- End diff --
    
    Hmm... if documents are just an ID and a list of token IDs, maybe something like a SparseVector is a better representation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...

Posted by etrain <gi...@git.apache.org>.
Github user etrain commented on the pull request:

    https://github.com/apache/spark/pull/476#issuecomment-41753636
  
    Also, speaking of @jegonzal maybe this is a natural first point of integration between MLlib and GraphX - I know GraphX has an implementation of LDA built in, and maybe this is a chance for us to leverage that work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...

Posted by etrain <gi...@git.apache.org>.
Github user etrain commented on a diff in the pull request:

    https://github.com/apache/spark/pull/476#discussion_r12126002
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala ---
    @@ -0,0 +1,169 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import java.util.Random
    +
    +import breeze.linalg.{DenseVector => BDV}
    +
    +import org.apache.spark.{AccumulableParam, Logging, SparkContext}
    +import org.apache.spark.mllib.expectation.GibbsSampling
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.mllib.util.MLUtils
    +import org.apache.spark.rdd.RDD
    +
    +case class Document(docId: Int, content: Iterable[Int])
    +
    +case class LDAParams (
    --- End diff --
    
    This should be called just "LDA" since it's the class that fits the model.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on the pull request:

    https://github.com/apache/spark/pull/476#issuecomment-41755358
  
    Yep, I know @jegonzal for his paper *Parallel Gibbs Sampling*. But I only have the idea of the implementation on GraphLab and not find the impl in GraphX. It's great if I have the chance to talk with Joseph offline.
    
    Besides, I will add a use case for reuters dataset and try to fix the issues put above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...

Posted by etrain <gi...@git.apache.org>.
Github user etrain commented on a diff in the pull request:

    https://github.com/apache/spark/pull/476#discussion_r12126007
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala ---
    @@ -0,0 +1,169 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import java.util.Random
    +
    +import breeze.linalg.{DenseVector => BDV}
    +
    +import org.apache.spark.{AccumulableParam, Logging, SparkContext}
    +import org.apache.spark.mllib.expectation.GibbsSampling
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.mllib.util.MLUtils
    +import org.apache.spark.rdd.RDD
    +
    +case class Document(docId: Int, content: Iterable[Int])
    +
    +case class LDAParams (
    --- End diff --
    
    Also - I don't think it should be a case class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: JIRA issue: [SPARK-1405] Gibbs sampling based ...

Posted by etrain <gi...@git.apache.org>.
Github user etrain commented on a diff in the pull request:

    https://github.com/apache/spark/pull/476#discussion_r12126191
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/expectation/GibbsSampling.scala ---
    @@ -0,0 +1,219 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.expectation
    +
    +import java.util.Random
    +
    +import breeze.linalg.{DenseVector => BDV, sum}
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.mllib.clustering.{Document, LDAParams}
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +
    +/**
    + * Gibbs sampling from a given dataset and org.apache.spark.mllib.model.
    + * @param data Dataset, such as corpus.
    + * @param numOuterIterations Number of outer iteration.
    + * @param numInnerIterations Number of inner iteration, used in each partition.
    + * @param docTopicSmoothing Document-topic smoothing.
    + * @param topicTermSmoothing Topic-term smoothing.
    + */
    +class GibbsSampling(
    +    data: RDD[Document],
    +    numOuterIterations: Int,
    +    numInnerIterations: Int,
    +    docTopicSmoothing: Double,
    +    topicTermSmoothing: Double)
    +  extends Logging with Serializable {
    +
    +  import GibbsSampling._
    +
    +  /**
    +   * Main function of running a Gibbs sampling method. It contains two phases of total Gibbs
    +   * sampling: first is initialization, second is real sampling.
    +   */
    +  def runGibbsSampling(
    +      initParams: LDAParams,
    +      data: RDD[Document] = data,
    +      numOuterIterations: Int = numOuterIterations,
    +      numInnerIterations: Int = numInnerIterations,
    +      docTopicSmoothing: Double = docTopicSmoothing,
    +      topicTermSmoothing: Double = topicTermSmoothing): LDAParams = {
    +
    +    val numTerms = initParams.topicTermCounts.head.size
    +    val numDocs = initParams.docCounts.size
    +    val numTopics = initParams.topicCounts.size
    +
    +    // Construct topic assignment RDD
    +    logInfo("Start initialization")
    +
    +    val cpInterval = System.getProperty("spark.gibbsSampling.checkPointInterval", "10").toInt
    +    val sc = data.context
    +    val (initialParams, initialChosenTopics) = sampleTermAssignment(initParams, data)
    +
    +    // Gibbs sampling
    +    val (params, _, _) = Iterator.iterate((sc.accumulable(initialParams), initialChosenTopics, 0)) {
    --- End diff --
    
    Why an accumulator and not an .aggregate()?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---