You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Sue Cai <ca...@hotmail.co.uk> on 2014/05/21 09:31:06 UTC

MLlib ALS-- Errors communicating with MapOutputTracker

Hello,

I am currently using MLlib ALS to process a large volume of data, about 1.2
billion Rating(userId, productId, rates) triples. The dataset was sepatated
into 4000 partitions for parallized computation on our yarn clusters. 

I encountered this error "Errors communicating with MapOutputTracker",
when trying to get the prediciton rates [model.predict(userproducts)] after
iterations.

    val predictions = model.predict(usersProducts).map{
      case Rating(user, product, rate) => ((user, product), rate)
    }

I tried to separate the iteration process and the process of culating
prediction rates value by storing the two feature matirces into file system
first; and the loading them for prediction. This time, the error occurred at
the stage of loading userFeatures. 

userfData: userId:[0.3,0.5,0.002,.....]

  val userfTuple =userfData.map{
      case (line) => {
        val arr = line.split(splitmark_1)
        val featureArr = arr(1).split(splitmark_2)
        (arr(0),featureArr)
      }
    }

Here is part of the log:
----------------------------------------------------------------------------------------------------------------------------------------------------------------

14-05-21 14:37:17 WARN [Result resolver thread-0] TaskSetManager: Loss was
due to org.apache.spark.SparkException
org.apache.spark.SparkException: Error communicating with MapOutputTracker
        at
org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:79)
        at
org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:126)
        at
org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:61)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:244)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:235)
        at
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:90)
        at
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:89)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:727)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:723)
        at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
        at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
        at org.apache.spark.scheduler.Task.run(Task.scala:53)
        at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:220)
        at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:184)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)


-------------------------------------------------------------------------------------------------------------------------------------------------------------------

I have tried several methods to solve this problem, one way was to decrease
the number of partitions(from 4000 to 3000), another was to increase the
memory of masters. Both worked, but it is still vital to track the
underneath causes there, right? 

Could anyone help me to check this problem? Thanks a lot. 

Sue Cai




--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/MLlib-ALS-Errors-communicating-with-MapOutputTracker-tp6740.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Re: Re:MLlib ALS-- Errors communicating with MapOutputTracker

Posted by Sue Cai <ca...@hotmail.co.uk>.
Hi Witgo,

     Thanks a lot for your reply.

     In my second test, the user features and product features were loaded
from the file system directly,which means I did not use ALS here, and this
problem happened at the loading data stage. The way I am asking the question
was a little bit miss leading, this problem might not be ALS specific.

    As far as I had discovered, it might related to the usage of master
memory, since I could solve this problem by increase the master memory size,
or reduce the number of partitions. But I am still waiting for a better
explanation :)

   Thank you again.

Sue   



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/MLlib-ALS-Errors-communicating-with-MapOutputTracker-tp6740p6758.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Re:MLlib ALS-- Errors communicating with MapOutputTracker

Posted by witgo <wi...@qq.com>.
Lack of hard disk space? If yes, you can try https://github.com/apache/spark/pull/828




------------------ Original ------------------
From:  "Sue Cai";<ca...@hotmail.co.uk>;
Date:  Wed, May 21, 2014 03:31 PM
To:  "dev"<de...@spark.incubator.apache.org>; 

Subject:  MLlib ALS-- Errors communicating with MapOutputTracker



Hello,

I am currently using MLlib ALS to process a large volume of data, about 1.2
billion Rating(userId, productId, rates) triples. The dataset was sepatated
into 4000 partitions for parallized computation on our yarn clusters. 

I encountered this error "Errors communicating with MapOutputTracker",
when trying to get the prediciton rates [model.predict(userproducts)] after
iterations.

    val predictions = model.predict(usersProducts).map{
      case Rating(user, product, rate) => ((user, product), rate)
    }

I tried to separate the iteration process and the process of culating
prediction rates value by storing the two feature matirces into file system
first; and the loading them for prediction. This time, the error occurred at
the stage of loading userFeatures. 

userfData: userId:[0.3,0.5,0.002,.....]

  val userfTuple =userfData.map{
      case (line) => {
        val arr = line.split(splitmark_1)
        val featureArr = arr(1).split(splitmark_2)
        (arr(0),featureArr)
      }
    }

Here is part of the log:
----------------------------------------------------------------------------------------------------------------------------------------------------------------

14-05-21 14:37:17 WARN [Result resolver thread-0] TaskSetManager: Loss was
due to org.apache.spark.SparkException
org.apache.spark.SparkException: Error communicating with MapOutputTracker
        at
org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:79)
        at
org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:126)
        at
org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:61)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:244)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:235)
        at
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:90)
        at
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:89)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:727)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:723)
        at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
        at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
        at org.apache.spark.scheduler.Task.run(Task.scala:53)
        at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:220)
        at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:184)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)


-------------------------------------------------------------------------------------------------------------------------------------------------------------------

I have tried several methods to solve this problem, one way was to decrease
the number of partitions(from 4000 to 3000), another was to increase the
memory of masters. Both worked, but it is still vital to track the
underneath causes there, right? 

Could anyone help me to check this problem? Thanks a lot. 

Sue Cai




--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/MLlib-ALS-Errors-communicating-with-MapOutputTracker-tp6740.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
.