You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Nan Zhu <zh...@gmail.com> on 2014/01/08 04:23:48 UTC

ship MatrixFactorizationModel with each partition?

Hi, all  

I ‘m trying the ALS in mllib

the following is my code

val result = als.run(ratingRDD)
    val allMovies = ratingRDD.map(rating => rating.product).distinct()
    val allUsers = ratingRDD.map(rating => rating.user).distinct()
    val allUserMoviePair = allUsers.cartesian(allMovies)
    val resultRDD = allUserMoviePair.map(userMoviePair => {
      var str = ""
      str += (userMoviePair._1 + ","  + userMoviePair._2 + "," +
        result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
      str
    })



every time result.predict throws exception like  

scala.MatchError: null at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:507) at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:42) at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:72) at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:69) at scala.collection.Iterator$$anon$19.next(Iterator.scala:401) at scala.collection.Iterator$$anon$19.next(Iterator.scala:401) at org.apache.spark.rdd.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:677) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724)  

if I change the code to pull the partitions into an array in the driver program, it works

val resultRDD = allUserMoviePair.collect().map(userMoviePair => {
      var str = ""
      str += (userMoviePair._1 + ","  + userMoviePair._2 + "," +
        result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
      str
    })


so the exception seems to be related to how to share the MatrixFactorizationModel  in each partition?

can anyone give me the hint

Thank you very much!

--  
Nan Zhu


Re: ship MatrixFactorizationModel with each partition?

Posted by Nan Zhu <zh...@gmail.com>.
Hi, Matei,   

Do you mean when we transform a certain RDD, the closure should not involve the other RDDs?

Best,  

--  
Nan Zhu



On Wednesday, January 8, 2014 at 12:33 AM, Matei Zaharia wrote:

> Sorry, you actually can’t call predict() on the cluster because the model contains some RDDs. There was a recent patch that added a parallel predict method, here: https://github.com/apache/incubator-spark/pull/328/files. You can grab the code from that method there (which does a join) and call that yourself in Spark 0.8.x.
>  
> Matei
>  
> On Jan 7, 2014, at 10:23 PM, Nan Zhu <zhunanmcgill@gmail.com (mailto:zhunanmcgill@gmail.com)> wrote:
> > Hi, all  
> >  
> > I ‘m trying the ALS in mllib
> >  
> > the following is my code
> >  
> > val result = als.run(ratingRDD)
> >     val allMovies = ratingRDD.map(rating => rating.product).distinct()
> >     val allUsers = ratingRDD.map(rating => rating.user).distinct()
> >     val allUserMoviePair = allUsers.cartesian(allMovies)
> >     val resultRDD = allUserMoviePair.map(userMoviePair => {
> >       var str = ""
> >       str += (userMoviePair._1 + ","  + userMoviePair._2 + "," +
> >         result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
> >       str
> >     })
> >  
> >  
> >  
> > every time result.predict throws exception like  
> >  
> > scala.MatchError: null at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:507) at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:42) at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:72) at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:69) at scala.collection.Iterator$$anon$19.next(Iterator.scala:401) at scala.collection.Iterator$$anon$19.next(Iterator.scala:401) at org.apache.spark.rdd.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:677) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724)  
> >  
> > if I change the code to pull the partitions into an array in the driver program, it works
> >  
> > val resultRDD = allUserMoviePair.collect().map(userMoviePair => {
> >       var str = ""
> >       str += (userMoviePair._1 + ","  + userMoviePair._2 + "," +
> >         result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
> >       str
> >     })
> >  
> >  
> > so the exception seems to be related to how to share the MatrixFactorizationModel  in each partition?
> >  
> > can anyone give me the hint
> >  
> > Thank you very much!
> >  
> > --  
> > Nan Zhu
> >  
>  


Re: ship MatrixFactorizationModel with each partition?

Posted by Nan Zhu <zh...@gmail.com>.
great  

thank you Matei  

--  
Nan Zhu



On Wednesday, January 8, 2014 at 12:33 AM, Matei Zaharia wrote:

> Sorry, you actually can’t call predict() on the cluster because the model contains some RDDs. There was a recent patch that added a parallel predict method, here: https://github.com/apache/incubator-spark/pull/328/files. You can grab the code from that method there (which does a join) and call that yourself in Spark 0.8.x.
>  
> Matei
>  
> On Jan 7, 2014, at 10:23 PM, Nan Zhu <zhunanmcgill@gmail.com (mailto:zhunanmcgill@gmail.com)> wrote:
> > Hi, all  
> >  
> > I ‘m trying the ALS in mllib
> >  
> > the following is my code
> >  
> > val result = als.run(ratingRDD)
> >     val allMovies = ratingRDD.map(rating => rating.product).distinct()
> >     val allUsers = ratingRDD.map(rating => rating.user).distinct()
> >     val allUserMoviePair = allUsers.cartesian(allMovies)
> >     val resultRDD = allUserMoviePair.map(userMoviePair => {
> >       var str = ""
> >       str += (userMoviePair._1 + ","  + userMoviePair._2 + "," +
> >         result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
> >       str
> >     })
> >  
> >  
> >  
> > every time result.predict throws exception like  
> >  
> > scala.MatchError: null at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:507) at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:42) at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:72) at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:69) at scala.collection.Iterator$$anon$19.next(Iterator.scala:401) at scala.collection.Iterator$$anon$19.next(Iterator.scala:401) at org.apache.spark.rdd.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:677) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724)  
> >  
> > if I change the code to pull the partitions into an array in the driver program, it works
> >  
> > val resultRDD = allUserMoviePair.collect().map(userMoviePair => {
> >       var str = ""
> >       str += (userMoviePair._1 + ","  + userMoviePair._2 + "," +
> >         result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
> >       str
> >     })
> >  
> >  
> > so the exception seems to be related to how to share the MatrixFactorizationModel  in each partition?
> >  
> > can anyone give me the hint
> >  
> > Thank you very much!
> >  
> > --  
> > Nan Zhu
> >  
>  


Re: ship MatrixFactorizationModel with each partition?

Posted by Matei Zaharia <ma...@gmail.com>.
Sorry, you actually can’t call predict() on the cluster because the model contains some RDDs. There was a recent patch that added a parallel predict method, here: https://github.com/apache/incubator-spark/pull/328/files. You can grab the code from that method there (which does a join) and call that yourself in Spark 0.8.x.

Matei

On Jan 7, 2014, at 10:23 PM, Nan Zhu <zh...@gmail.com> wrote:

> Hi, all
> 
> I ‘m trying the ALS in mllib
> 
> the following is my code
> 
> val result = als.run(ratingRDD)
>     val allMovies = ratingRDD.map(rating => rating.product).distinct()
>     val allUsers = ratingRDD.map(rating => rating.user).distinct()
>     val allUserMoviePair = allUsers.cartesian(allMovies)
>     val resultRDD = allUserMoviePair.map(userMoviePair => {
>       var str = ""
>       str += (userMoviePair._1 + ","  + userMoviePair._2 + "," +
>         result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
>       str
>     })
> 
> 
> every time result.predict throws exception like 
> 
> scala.MatchError: null
> 	at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:507)
> 	at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:42)
> 	at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:72)
> 	at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:69)
> 	at scala.collection.Iterator$$anon$19.next(Iterator.scala:401)
> 	at scala.collection.Iterator$$anon$19.next(Iterator.scala:401)
> 	at org.apache.spark.rdd.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:677)
> 	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686)
> 	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:53)
> 	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
> 	at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:724)
> 
> if I change the code to pull the partitions into an array in the driver program, it works
> 
> val resultRDD = allUserMoviePair.collect().map(userMoviePair => {
>       var str = ""
>       str += (userMoviePair._1 + ","  + userMoviePair._2 + "," +
>         result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
>       str
>     })
> 
> so the exception seems to be related to how to share the MatrixFactorizationModel  in each partition?
> 
> can anyone give me the hint
> 
> Thank you very much!
> 
> -- 
> Nan Zhu
>