You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by d0evi1 <gi...@git.apache.org> on 2017/05/19 09:38:27 UTC

[GitHub] spark pull request #18034: [SPARK-20797][MLLIB]fix LocalLDAModel.save() bug.

GitHub user d0evi1 opened a pull request:

    https://github.com/apache/spark/pull/18034

    [SPARK-20797][MLLIB]fix LocalLDAModel.save() bug.

    ## What changes were proposed in this pull request?
    
    LocalLDAModel's model save function has a bug: 
    
    please see: https://issues.apache.org/jira/browse/SPARK-20797
    
    add some code like word2vec's save method (repartition), to avoid this bug.
    
    ## How was this patch tested?
    
    it's hard to test. need large data to train with online LDA method.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/d0evi1/spark mllib-mod-pr

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/18034.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #18034
    
----

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18034: [SPARK-20797][MLLIB]fix LocalLDAModel.save() bug.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18034
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18034: [SPARK-20797][MLLIB]fix LocalLDAModel.save() bug.

Posted by WeichenXu123 <gi...@git.apache.org>.
Github user WeichenXu123 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18034#discussion_r141016974
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala ---
    @@ -468,7 +469,16 @@ object LocalLDAModel extends Loader[LocalLDAModel] {
           val topics = Range(0, k).map { topicInd =>
             Data(Vectors.dense((topicsDenseMatrix(::, topicInd).toArray)), topicInd)
           }
    -      spark.createDataFrame(topics).repartition(1).write.parquet(Loader.dataPath(path))
    +
    +      val bufferSize = Utils.byteStringAsBytes(
    +        spark.conf.get("spark.kryoserializer.buffer.max", "64m"))
    +      // We calculate the approximate size of the model
    +      // We only calculate the array size, considering an
    +      // average string size of 15 bytes, the formula is:
    +      // (floatSize * vectorSize + 15) * numWords
    +      val approxSize = (4L * k + 15) * topicsMatrix.numRows
    +      val nPartitions = ((approxSize / bufferSize) + 1).toInt
    +      spark.createDataFrame(topics).repartition(nPartitions).write.parquet(Loader.dataPath(path))
    --- End diff --
    
    Can you add a testcase to save model into multiple files and load back and check the correctness ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18034: [SPARK-20797][MLLIB]fix LocalLDAModel.save() bug.

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/18034


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18034: [SPARK-20797][MLLIB]fix LocalLDAModel.save() bug.

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/18034
  
    Any update? seems the author looks inactive. Please let me know if I am not mistaken. Let me leave this closed if so.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18034: [SPARK-20797][MLLIB]fix LocalLDAModel.save() bug.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18034
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18034: [SPARK-20797][MLLIB]fix LocalLDAModel.save() bug.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18034
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18034: [SPARK-20797][MLLIB]fix LocalLDAModel.save() bug.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18034
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18034: [SPARK-20797][MLLIB]fix LocalLDAModel.save() bug.

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18034#discussion_r117745103
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala ---
    @@ -468,7 +469,16 @@ object LocalLDAModel extends Loader[LocalLDAModel] {
           val topics = Range(0, k).map { topicInd =>
             Data(Vectors.dense((topicsDenseMatrix(::, topicInd).toArray)), topicInd)
           }
    -      spark.createDataFrame(topics).repartition(1).write.parquet(Loader.dataPath(path))
    +
    +      val bufferSize = Utils.byteStringAsBytes(
    +        spark.conf.get("spark.kryoserializer.buffer.max", "64m"))
    +      // We calculate the approximate size of the model
    +      // We only calculate the array size, considering an
    +      // average string size of 15 bytes, the formula is:
    +      // (floatSize * vectorSize + 15) * numWords
    +      val approxSize = (4L * k + 15) * topicsMatrix.numRows
    +      val nPartitions = ((approxSize / bufferSize) + 1).toInt
    +      spark.createDataFrame(topics).repartition(nPartitions).write.parquet(Loader.dataPath(path))
    --- End diff --
    
    @d0evi1 I'm not sure how that's related to my comment.
    @hhbyyh what do you think -- am I wrong about the issue?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18034: [SPARK-20797][MLLIB]fix LocalLDAModel.save() bug.

Posted by d0evi1 <gi...@git.apache.org>.
Github user d0evi1 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18034#discussion_r117602669
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala ---
    @@ -468,7 +469,16 @@ object LocalLDAModel extends Loader[LocalLDAModel] {
           val topics = Range(0, k).map { topicInd =>
             Data(Vectors.dense((topicsDenseMatrix(::, topicInd).toArray)), topicInd)
           }
    -      spark.createDataFrame(topics).repartition(1).write.parquet(Loader.dataPath(path))
    +
    +      val bufferSize = Utils.byteStringAsBytes(
    +        spark.conf.get("spark.kryoserializer.buffer.max", "64m"))
    +      // We calculate the approximate size of the model
    +      // We only calculate the array size, considering an
    +      // average string size of 15 bytes, the formula is:
    +      // (floatSize * vectorSize + 15) * numWords
    +      val approxSize = (4L * k + 15) * topicsMatrix.numRows
    +      val nPartitions = ((approxSize / bufferSize) + 1).toInt
    +      spark.createDataFrame(topics).repartition(nPartitions).write.parquet(Loader.dataPath(path))
    --- End diff --
    
    why not? i think it does works. the multiple parquet files will may be in random order, but it will save topic indices. when u call load process, parquet will restore dataframe, u can check the LocalLDAModel's load method, it will scan all dataframe's row with the topic indices to rebuild the (topic x vocab) breeze matrix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18034: [SPARK-20797][MLLIB]fix LocalLDAModel.save() bug.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18034
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18034: [SPARK-20797][MLLIB]fix LocalLDAModel.save() bug.

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18034#discussion_r117606918
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala ---
    @@ -468,7 +469,16 @@ object LocalLDAModel extends Loader[LocalLDAModel] {
           val topics = Range(0, k).map { topicInd =>
             Data(Vectors.dense((topicsDenseMatrix(::, topicInd).toArray)), topicInd)
           }
    -      spark.createDataFrame(topics).repartition(1).write.parquet(Loader.dataPath(path))
    +
    +      val bufferSize = Utils.byteStringAsBytes(
    +        spark.conf.get("spark.kryoserializer.buffer.max", "64m"))
    +      // We calculate the approximate size of the model
    +      // We only calculate the array size, considering an
    +      // average string size of 15 bytes, the formula is:
    +      // (floatSize * vectorSize + 15) * numWords
    +      val approxSize = (4L * k + 15) * topicsMatrix.numRows
    +      val nPartitions = ((approxSize / bufferSize) + 1).toInt
    +      spark.createDataFrame(topics).repartition(nPartitions).write.parquet(Loader.dataPath(path))
    --- End diff --
    
    I believe the point of repartition(1) is to export this to a single file for external consumption. Of course writing it succeeds, but I don't think this is what it is intended to do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18034: [SPARK-20797][MLLIB]fix LocalLDAModel.save() bug.

Posted by d0evi1 <gi...@git.apache.org>.
Github user d0evi1 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18034#discussion_r117607822
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala ---
    @@ -468,7 +469,16 @@ object LocalLDAModel extends Loader[LocalLDAModel] {
           val topics = Range(0, k).map { topicInd =>
             Data(Vectors.dense((topicsDenseMatrix(::, topicInd).toArray)), topicInd)
           }
    -      spark.createDataFrame(topics).repartition(1).write.parquet(Loader.dataPath(path))
    +
    +      val bufferSize = Utils.byteStringAsBytes(
    +        spark.conf.get("spark.kryoserializer.buffer.max", "64m"))
    +      // We calculate the approximate size of the model
    +      // We only calculate the array size, considering an
    +      // average string size of 15 bytes, the formula is:
    +      // (floatSize * vectorSize + 15) * numWords
    +      val approxSize = (4L * k + 15) * topicsMatrix.numRows
    +      val nPartitions = ((approxSize / bufferSize) + 1).toInt
    +      spark.createDataFrame(topics).repartition(nPartitions).write.parquet(Loader.dataPath(path))
    --- End diff --
    
    I've read mllib's online lda's code implements and the paper. 
    
    i found a simple way to test it , u can try this code snippet, simulation a matrix to save: 
    
    `
    val vocabSize = 500000
    val k = 300
    val random = new Random()
    val topicsDenseMatrix = DenseMatrix.fill[Double](vocabSize, k)(random.nextDouble())
    `
    
    if the spark's configure param not set ok, the repartition(1) way will fail. in the raw implement, every row has has a very long Vector. this will cause the seralize problem.
    
    another problem is why not to save the transposition matrix? 
    
    Can someone check it's raw purpose(use repartition(1))? thanks! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18034: [SPARK-20797][MLLIB]fix LocalLDAModel.save() bug.

Posted by Krimit <gi...@git.apache.org>.
Github user Krimit commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18034#discussion_r124916809
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala ---
    @@ -468,7 +469,16 @@ object LocalLDAModel extends Loader[LocalLDAModel] {
           val topics = Range(0, k).map { topicInd =>
             Data(Vectors.dense((topicsDenseMatrix(::, topicInd).toArray)), topicInd)
           }
    -      spark.createDataFrame(topics).repartition(1).write.parquet(Loader.dataPath(path))
    +
    +      val bufferSize = Utils.byteStringAsBytes(
    +        spark.conf.get("spark.kryoserializer.buffer.max", "64m"))
    +      // We calculate the approximate size of the model
    +      // We only calculate the array size, considering an
    +      // average string size of 15 bytes, the formula is:
    +      // (floatSize * vectorSize + 15) * numWords
    +      val approxSize = (4L * k + 15) * topicsMatrix.numRows
    +      val nPartitions = ((approxSize / bufferSize) + 1).toInt
    +      spark.createDataFrame(topics).repartition(nPartitions).write.parquet(Loader.dataPath(path))
    --- End diff --
    
    @srowen what's your concern with saving multiple files instead of one?
    
    We've encountered crippling errors saving large LDA models in the ml api as well. The problem there is even worse, since the entire matrix is treated as one single datum


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18034: [SPARK-20797][MLLIB]fix LocalLDAModel.save() bug.

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18034#discussion_r117792674
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala ---
    @@ -468,7 +469,16 @@ object LocalLDAModel extends Loader[LocalLDAModel] {
           val topics = Range(0, k).map { topicInd =>
             Data(Vectors.dense((topicsDenseMatrix(::, topicInd).toArray)), topicInd)
           }
    -      spark.createDataFrame(topics).repartition(1).write.parquet(Loader.dataPath(path))
    +
    +      val bufferSize = Utils.byteStringAsBytes(
    +        spark.conf.get("spark.kryoserializer.buffer.max", "64m"))
    +      // We calculate the approximate size of the model
    +      // We only calculate the array size, considering an
    +      // average string size of 15 bytes, the formula is:
    +      // (floatSize * vectorSize + 15) * numWords
    +      val approxSize = (4L * k + 15) * topicsMatrix.numRows
    +      val nPartitions = ((approxSize / bufferSize) + 1).toInt
    +      spark.createDataFrame(topics).repartition(nPartitions).write.parquet(Loader.dataPath(path))
    --- End diff --
    
    I think the PR is trying to do something similar to https://github.com/apache/spark/pull/9989/files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18034: [SPARK-20797][MLLIB]fix LocalLDAModel.save() bug.

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18034#discussion_r117443669
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala ---
    @@ -468,7 +469,16 @@ object LocalLDAModel extends Loader[LocalLDAModel] {
           val topics = Range(0, k).map { topicInd =>
             Data(Vectors.dense((topicsDenseMatrix(::, topicInd).toArray)), topicInd)
           }
    -      spark.createDataFrame(topics).repartition(1).write.parquet(Loader.dataPath(path))
    +
    +      val bufferSize = Utils.byteStringAsBytes(
    +        spark.conf.get("spark.kryoserializer.buffer.max", "64m"))
    +      // We calculate the approximate size of the model
    +      // We only calculate the array size, considering an
    +      // average string size of 15 bytes, the formula is:
    +      // (floatSize * vectorSize + 15) * numWords
    +      val approxSize = (4L * k + 15) * topicsMatrix.numRows
    +      val nPartitions = ((approxSize / bufferSize) + 1).toInt
    +      spark.createDataFrame(topics).repartition(nPartitions).write.parquet(Loader.dataPath(path))
    --- End diff --
    
    The problem is that this writes multiple files. I don't think you can do that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org