You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Nan Zhu <zh...@gmail.com> on 2014/01/08 04:23:48 UTC
ship MatrixFactorizationModel with each partition?
Hi, all
I ‘m trying the ALS in mllib
the following is my code
val result = als.run(ratingRDD)
val allMovies = ratingRDD.map(rating => rating.product).distinct()
val allUsers = ratingRDD.map(rating => rating.user).distinct()
val allUserMoviePair = allUsers.cartesian(allMovies)
val resultRDD = allUserMoviePair.map(userMoviePair => {
var str = ""
str += (userMoviePair._1 + "," + userMoviePair._2 + "," +
result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
str
})
every time result.predict throws exception like
scala.MatchError: null at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:507) at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:42) at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:72) at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:69) at scala.collection.Iterator$$anon$19.next(Iterator.scala:401) at scala.collection.Iterator$$anon$19.next(Iterator.scala:401) at org.apache.spark.rdd.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:677) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724)
if I change the code to pull the partitions into an array in the driver program, it works
val resultRDD = allUserMoviePair.collect().map(userMoviePair => {
var str = ""
str += (userMoviePair._1 + "," + userMoviePair._2 + "," +
result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
str
})
so the exception seems to be related to how to share the MatrixFactorizationModel in each partition?
can anyone give me the hint
Thank you very much!
--
Nan Zhu
Re: ship MatrixFactorizationModel with each partition?
Posted by Nan Zhu <zh...@gmail.com>.
Hi, Matei,
Do you mean when we transform a certain RDD, the closure should not involve the other RDDs?
Best,
--
Nan Zhu
On Wednesday, January 8, 2014 at 12:33 AM, Matei Zaharia wrote:
> Sorry, you actually can’t call predict() on the cluster because the model contains some RDDs. There was a recent patch that added a parallel predict method, here: https://github.com/apache/incubator-spark/pull/328/files. You can grab the code from that method there (which does a join) and call that yourself in Spark 0.8.x.
>
> Matei
>
> On Jan 7, 2014, at 10:23 PM, Nan Zhu <zhunanmcgill@gmail.com (mailto:zhunanmcgill@gmail.com)> wrote:
> > Hi, all
> >
> > I ‘m trying the ALS in mllib
> >
> > the following is my code
> >
> > val result = als.run(ratingRDD)
> > val allMovies = ratingRDD.map(rating => rating.product).distinct()
> > val allUsers = ratingRDD.map(rating => rating.user).distinct()
> > val allUserMoviePair = allUsers.cartesian(allMovies)
> > val resultRDD = allUserMoviePair.map(userMoviePair => {
> > var str = ""
> > str += (userMoviePair._1 + "," + userMoviePair._2 + "," +
> > result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
> > str
> > })
> >
> >
> >
> > every time result.predict throws exception like
> >
> > scala.MatchError: null at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:507) at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:42) at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:72) at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:69) at scala.collection.Iterator$$anon$19.next(Iterator.scala:401) at scala.collection.Iterator$$anon$19.next(Iterator.scala:401) at org.apache.spark.rdd.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:677) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724)
> >
> > if I change the code to pull the partitions into an array in the driver program, it works
> >
> > val resultRDD = allUserMoviePair.collect().map(userMoviePair => {
> > var str = ""
> > str += (userMoviePair._1 + "," + userMoviePair._2 + "," +
> > result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
> > str
> > })
> >
> >
> > so the exception seems to be related to how to share the MatrixFactorizationModel in each partition?
> >
> > can anyone give me the hint
> >
> > Thank you very much!
> >
> > --
> > Nan Zhu
> >
>
Re: ship MatrixFactorizationModel with each partition?
Posted by Nan Zhu <zh...@gmail.com>.
great
thank you Matei
--
Nan Zhu
On Wednesday, January 8, 2014 at 12:33 AM, Matei Zaharia wrote:
> Sorry, you actually can’t call predict() on the cluster because the model contains some RDDs. There was a recent patch that added a parallel predict method, here: https://github.com/apache/incubator-spark/pull/328/files. You can grab the code from that method there (which does a join) and call that yourself in Spark 0.8.x.
>
> Matei
>
> On Jan 7, 2014, at 10:23 PM, Nan Zhu <zhunanmcgill@gmail.com (mailto:zhunanmcgill@gmail.com)> wrote:
> > Hi, all
> >
> > I ‘m trying the ALS in mllib
> >
> > the following is my code
> >
> > val result = als.run(ratingRDD)
> > val allMovies = ratingRDD.map(rating => rating.product).distinct()
> > val allUsers = ratingRDD.map(rating => rating.user).distinct()
> > val allUserMoviePair = allUsers.cartesian(allMovies)
> > val resultRDD = allUserMoviePair.map(userMoviePair => {
> > var str = ""
> > str += (userMoviePair._1 + "," + userMoviePair._2 + "," +
> > result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
> > str
> > })
> >
> >
> >
> > every time result.predict throws exception like
> >
> > scala.MatchError: null at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:507) at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:42) at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:72) at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:69) at scala.collection.Iterator$$anon$19.next(Iterator.scala:401) at scala.collection.Iterator$$anon$19.next(Iterator.scala:401) at org.apache.spark.rdd.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:677) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724)
> >
> > if I change the code to pull the partitions into an array in the driver program, it works
> >
> > val resultRDD = allUserMoviePair.collect().map(userMoviePair => {
> > var str = ""
> > str += (userMoviePair._1 + "," + userMoviePair._2 + "," +
> > result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
> > str
> > })
> >
> >
> > so the exception seems to be related to how to share the MatrixFactorizationModel in each partition?
> >
> > can anyone give me the hint
> >
> > Thank you very much!
> >
> > --
> > Nan Zhu
> >
>
Re: ship MatrixFactorizationModel with each partition?
Posted by Matei Zaharia <ma...@gmail.com>.
Sorry, you actually can’t call predict() on the cluster because the model contains some RDDs. There was a recent patch that added a parallel predict method, here: https://github.com/apache/incubator-spark/pull/328/files. You can grab the code from that method there (which does a join) and call that yourself in Spark 0.8.x.
Matei
On Jan 7, 2014, at 10:23 PM, Nan Zhu <zh...@gmail.com> wrote:
> Hi, all
>
> I ‘m trying the ALS in mllib
>
> the following is my code
>
> val result = als.run(ratingRDD)
> val allMovies = ratingRDD.map(rating => rating.product).distinct()
> val allUsers = ratingRDD.map(rating => rating.user).distinct()
> val allUserMoviePair = allUsers.cartesian(allMovies)
> val resultRDD = allUserMoviePair.map(userMoviePair => {
> var str = ""
> str += (userMoviePair._1 + "," + userMoviePair._2 + "," +
> result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
> str
> })
>
>
> every time result.predict throws exception like
>
> scala.MatchError: null
> at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:507)
> at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:42)
> at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:72)
> at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:69)
> at scala.collection.Iterator$$anon$19.next(Iterator.scala:401)
> at scala.collection.Iterator$$anon$19.next(Iterator.scala:401)
> at org.apache.spark.rdd.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:677)
> at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686)
> at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
> at org.apache.spark.scheduler.Task.run(Task.scala:53)
> at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
> at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:724)
>
> if I change the code to pull the partitions into an array in the driver program, it works
>
> val resultRDD = allUserMoviePair.collect().map(userMoviePair => {
> var str = ""
> str += (userMoviePair._1 + "," + userMoviePair._2 + "," +
> result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
> str
> })
>
> so the exception seems to be related to how to share the MatrixFactorizationModel in each partition?
>
> can anyone give me the hint
>
> Thank you very much!
>
> --
> Nan Zhu
>